[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611366#comment-16611366 ]
ASF GitHub Bot commented on KAFKA-3514: --------------------------------------- guozhangwang closed pull request #5428: KAFKA-3514: Part III, Refactor StreamThread main loop URL: https://github.com/apache/kafka/pull/5428 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 54d065ff3c2..1fcabca134c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -499,7 +499,7 @@ public ConsumerConfig(Map<String, Object> props) { super(CONFIG, props); } - ConsumerConfig(Map<?, ?> props, boolean doLog) { + protected ConsumerConfig(Map<?, ?> props, boolean doLog) { super(CONFIG, props, doLog); } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index b9eaaa68f67..736e9cbd34f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -278,7 +278,7 @@ /** {@code buffered.records.per.partition} */ @SuppressWarnings("WeakerAccess") public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; - private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition."; + private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition."; /** {@code cache.max.bytes.buffering} */ @SuppressWarnings("WeakerAccess") @@ -298,6 +298,11 @@ " (Note, if <code>processing.guarantee</code> is set to <code>" + EXACTLY_ONCE + "</code>, the default value is <code>" + EOS_DEFAULT_COMMIT_INTERVAL_MS + "</code>," + " otherwise the default value is <code>" + DEFAULT_COMMIT_INTERVAL_MS + "</code>."; + /** {@code max.task.idle.ms} */ + public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms"; + private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records," + + " to avoid potential out-of-order record processing across multiple input streams."; + /** {@code connections.max.idle.ms} */ @SuppressWarnings("WeakerAccess") public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; @@ -538,6 +543,11 @@ 1, Importance.MEDIUM, NUM_STREAM_THREADS_DOC) + .define(MAX_TASK_IDLE_MS_CONFIG, + Type.LONG, + 0L, + Importance.MEDIUM, + MAX_TASK_IDLE_MS_DOC) .define(PROCESSING_GUARANTEE_CONFIG, Type.STRING, AT_LEAST_ONCE, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 94e4c71d9c2..8eb024cc670 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -51,9 +51,11 @@ final boolean eosEnabled; final Logger log; final LogContext logContext; + final StateDirectory stateDirectory; + boolean taskInitialized; boolean taskClosed; - final StateDirectory stateDirectory; + boolean commitNeeded; InternalProcessorContext processorContext; @@ -267,6 +269,10 @@ public boolean isClosed() { return taskClosed; } + public boolean commitNeeded() { + return commitNeeded; + } + public boolean hasStateStores() { return !topology.stateStores().isEmpty(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index 0a839655748..1eb3ab97bf8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -21,37 +21,14 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; import java.util.HashMap; import java.util.Iterator; import java.util.Map; class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks { - private final Logger log; - private final TaskAction<StreamTask> maybeCommitAction; - private int committed = 0; - AssignedStreamsTasks(final LogContext logContext) { super(logContext, "stream task"); - - this.log = logContext.logger(getClass()); - - maybeCommitAction = new TaskAction<StreamTask>() { - @Override - public String name() { - return "maybeCommit"; - } - - @Override - public void apply(final StreamTask task) { - if (task.commitNeeded()) { - committed++; - task.commit(); - log.debug("Committed active task {} per user request in", task.id()); - } - } - }; } @Override @@ -63,9 +40,41 @@ public StreamTask restoringTaskFor(final TopicPartition partition) { * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) */ - int maybeCommit() { - committed = 0; - applyToRunningTasks(maybeCommitAction); + int maybeCommitPerUserRequested() { + int committed = 0; + RuntimeException firstException = null; + + for (final Iterator<StreamTask> it = running().iterator(); it.hasNext(); ) { + final StreamTask task = it.next(); + try { + if (task.commitRequested() && task.commitNeeded()) { + task.commit(); + committed++; + log.debug("Committed active task {} per user request in", task.id()); + } + } catch (final TaskMigratedException e) { + log.info("Failed to commit {} since it got migrated to another thread already. " + + "Closing it as zombie before triggering a new rebalance.", task.id()); + final RuntimeException fatalException = closeZombieTask(task); + if (fatalException != null) { + throw fatalException; + } + it.remove(); + throw e; + } catch (final RuntimeException t) { + log.error("Failed to commit StreamTask {} due to the following error:", + task.id(), + t); + if (firstException == null) { + firstException = t; + } + } + } + + if (firstException != null) { + throw firstException; + } + return committed; } @@ -85,15 +94,14 @@ int maybeCommit() { /** * @throws TaskMigratedException if the task producer got fenced (EOS only) */ - int process() { + int process(final long now) { int processed = 0; final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator(); while (it.hasNext()) { final StreamTask task = it.next().getValue(); - try { - if (task.isProcessable() && task.process()) { + if (task.isProcessable(now) && task.process()) { processed++; } } catch (final TaskMigratedException e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 079d405cb50..3cc396df8f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -37,14 +37,14 @@ import java.util.concurrent.atomic.AtomicReference; abstract class AssignedTasks<T extends Task> { - private final Logger log; + final Logger log; private final String taskTypeName; - private final TaskAction<T> commitAction; private final Map<TaskId, T> created = new HashMap<>(); private final Map<TaskId, T> suspended = new HashMap<>(); private final Map<TaskId, T> restoring = new HashMap<>(); private final Set<TopicPartition> restoredPartitions = new HashSet<>(); private final Set<TaskId> previousActiveTasks = new HashSet<>(); + // IQ may access this map. final Map<TaskId, T> running = new ConcurrentHashMap<>(); private final Map<TopicPartition, T> runningByPartition = new HashMap<>(); @@ -53,20 +53,7 @@ AssignedTasks(final LogContext logContext, final String taskTypeName) { this.taskTypeName = taskTypeName; - this.log = logContext.logger(getClass()); - - commitAction = new TaskAction<T>() { - @Override - public String name() { - return "commit"; - } - - @Override - public void apply(final T task) { - task.commit(); - } - }; } void addNewTask(final T task) { @@ -349,17 +336,16 @@ void clear() { * or if the task producer got fenced (EOS) */ int commit() { - applyToRunningTasks(commitAction); - return running.size(); - } - - void applyToRunningTasks(final TaskAction<T> action) { + int committed = 0; RuntimeException firstException = null; for (final Iterator<T> it = running().iterator(); it.hasNext(); ) { final T task = it.next(); try { - action.apply(task); + if (task.commitNeeded()) { + task.commit(); + committed++; + } } catch (final TaskMigratedException e) { log.info("Failed to commit {} {} since it got migrated to another thread already. " + "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id()); @@ -370,11 +356,10 @@ void applyToRunningTasks(final TaskAction<T> action) { it.remove(); throw e; } catch (final RuntimeException t) { - log.error("Failed to {} {} {} due to the following error:", - action.name(), - taskTypeName, - task.id(), - t); + log.error("Failed to commit {} {} due to the following error:", + taskTypeName, + task.id(), + t); if (firstException == null) { firstException = t; } @@ -384,6 +369,8 @@ void applyToRunningTasks(final TaskAction<T> action) { if (firstException != null) { throw firstException; } + + return committed; } void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAssignment) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index f17c63acd2f..70202534d69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -66,7 +66,7 @@ RecordQueue queue() { this.partitionQueues = partitionQueues; totalBuffered = 0; allBuffered = false; - streamTime = RecordQueue.NOT_KNOWN; + streamTime = RecordQueue.UNKNOWN; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index beab35f61c5..ee21379c79f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -146,7 +146,7 @@ public StateStore getStateStore(final String name) { @Override public void commit() { - task.needCommit(); + task.requestCommit(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 76d1a0c8f4d..15a5c212bfb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -318,6 +318,7 @@ private BatchingStateRestoreCallback getBatchingRestoreCallback(final StateResto return (BatchingStateRestoreCallback) callback; } + // TODO: avoid creating a new object for each update call? return new WrappedBatchingStateRestoreCallback(callback); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 86340bb82c1..d06d7f3bad6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -36,7 +36,7 @@ */ public class RecordQueue { - static final long NOT_KNOWN = -1L; + static final long UNKNOWN = -1L; private final Logger log; private final SourceNode source; @@ -46,7 +46,7 @@ private final RecordDeserializer recordDeserializer; private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue; - private long partitionTime = NOT_KNOWN; + private long partitionTime = UNKNOWN; private StampedRecord headRecord = null; RecordQueue(final TopicPartition partition, @@ -151,7 +151,7 @@ public long timestamp() { public void clear() { fifoQueue.clear(); headRecord = null; - partitionTime = NOT_KNOWN; + partitionTime = UNKNOWN; } private void maybeUpdateTimestamp() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 67834d7a7cd..e8631aaf85a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -74,7 +74,7 @@ public void close() {} } }; - private long streamTime = RecordQueue.NOT_KNOWN; + private long streamTime = RecordQueue.UNKNOWN; StandbyContextImpl(final TaskId id, final StreamsConfig config, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 72cc6295a2d..3ac64146187 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -103,6 +103,8 @@ public void commit() { flushAndCheckpointState(); // reinitialize offset limits updateOffsetLimits(); + + commitNeeded = false; } /** @@ -185,6 +187,11 @@ public void closeSuspended(final boolean clean, } stateMgr.updateStandbyStates(partition, restoreRecords, lastOffset); + + if (!restoreRecords.isEmpty()) { + commitNeeded = true; + } + return remainingRecords; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7f3d31fd776..2f97b7f27ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -60,26 +60,23 @@ private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null); - private static final int WAIT_ON_PARTIAL_INPUT = 5; - + private final Time time; + private final long maxTaskIdleMs; + private final int maxBufferedSize; + private final TaskMetrics taskMetrics; private final PartitionGroup partitionGroup; + private final RecordCollector recordCollector; private final PartitionGroup.RecordInfo recordInfo; + private final Map<TopicPartition, Long> consumedOffsets; private final PunctuationQueue streamTimePunctuationQueue; private final PunctuationQueue systemTimePunctuationQueue; - - private final Map<TopicPartition, Long> consumedOffsets; - private final RecordCollector recordCollector; private final ProducerSupplier producerSupplier; - private Producer<byte[], byte[]> producer; - private final int maxBufferedSize; + private Sensor closeSensor; + private long idleStartTime; + private Producer<byte[], byte[]> producer; private boolean commitRequested = false; - private boolean commitOffsetNeeded = false; private boolean transactionInFlight = false; - private int waits = WAIT_ON_PARTIAL_INPUT; - private final Time time; - private final TaskMetrics taskMetrics; - private Sensor closeSensor; protected static final class TaskMetrics { final StreamsMetricsImpl metrics; @@ -87,7 +84,6 @@ final Sensor taskEnforcedProcessSensor; private final String taskName; - TaskMetrics(final TaskId id, final StreamsMetricsImpl metrics) { taskName = id.toString(); this.metrics = metrics; @@ -134,13 +130,13 @@ ); // add the metrics for enforced processing - taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, "enforced-process", Sensor.RecordingLevel.DEBUG, parent); + taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, "enforced-processing", Sensor.RecordingLevel.DEBUG, parent); taskEnforcedProcessSensor.add( - new MetricName("enforced-process-rate", group, "The average number of occurrence of enforced-process per second.", tagMap), + new MetricName("enforced-processing-rate", group, "The average number of occurrence of enforced-processing operation per second.", tagMap), new Rate(TimeUnit.SECONDS, new Count()) ); taskEnforcedProcessSensor.add( - new MetricName("enforced-process-total", group, "The total number of occurrence of enforced-process operations.", tagMap), + new MetricName("enforced-processing-total", group, "The total number of occurrence of enforced-processing operations.", tagMap), new CumulativeCount() ); @@ -207,6 +203,7 @@ public StreamTask(final TaskId id, streamTimePunctuationQueue = new PunctuationQueue(); systemTimePunctuationQueue = new PunctuationQueue(); + maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); // initialize the consumed and committed offset cache @@ -253,6 +250,7 @@ public StreamTask(final TaskId id, public boolean initializeStateStores() { log.trace("Initializing state stores"); registerStateStores(); + return changelogPartitions().isEmpty(); } @@ -277,7 +275,10 @@ public void initializeTopology() { } processorContext.initialized(); + taskInitialized = true; + + idleStartTime = RecordQueue.UNKNOWN; } /** @@ -299,18 +300,27 @@ public void resume() { } /** - * An active task is processable if its buffer contains data for all of its input source topic partitions + * An active task is processable if its buffer contains data for all of its input + * source topic partitions, or if it is enforced to be processable */ - public boolean isProcessable() { + boolean isProcessable(final long now) { if (partitionGroup.allPartitionsBuffered()) { + idleStartTime = RecordQueue.UNKNOWN; return true; - } else if (partitionGroup.numBuffered() > 0 && --waits < 0) { - taskMetrics.taskEnforcedProcessSensor.record(); - waits = WAIT_ON_PARTIAL_INPUT; - return true; - } + } else if (partitionGroup.numBuffered() > 0) { + if (idleStartTime == RecordQueue.UNKNOWN) { + idleStartTime = now; + } - return false; + if (now - idleStartTime >= maxTaskIdleMs) { + taskMetrics.taskEnforcedProcessSensor.record(); + return true; + } else { + return false; + } + } else { + return false; + } } /** @@ -343,7 +353,7 @@ public boolean process() { // update the consumed offset map after processing is done consumedOffsets.put(partition, record.offset()); - commitOffsetNeeded = true; + commitNeeded = true; // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition @@ -435,10 +445,32 @@ void commit(final boolean startNewTransaction) { stateMgr.checkpoint(activeTaskCheckpointableOffsets()); } - commitOffsets(startNewTransaction); + final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); + for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { + final TopicPartition partition = entry.getKey(); + final long offset = entry.getValue() + 1; + consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); + stateMgr.putOffsetLimit(partition, offset); + } - commitRequested = false; + try { + if (eosEnabled) { + producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); + producer.commitTransaction(); + transactionInFlight = false; + if (startNewTransaction) { + producer.beginTransaction(); + transactionInFlight = true; + } + } else { + consumer.commitSync(consumedOffsetsAndMetadata); + } + } catch (final CommitFailedException | ProducerFencedException error) { + throw new TaskMigratedException(this, error); + } + commitNeeded = false; + commitRequested = false; taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs); } @@ -463,43 +495,6 @@ protected void flushState() { } } - /** - * @throws TaskMigratedException if committing offsets failed (non-EOS) - * or if the task producer got fenced (EOS) - */ - private void commitOffsets(final boolean startNewTransaction) { - try { - if (commitOffsetNeeded) { - log.trace("Committing offsets"); - final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); - for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { - final TopicPartition partition = entry.getKey(); - final long offset = entry.getValue() + 1; - consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); - stateMgr.putOffsetLimit(partition, offset); - } - - if (eosEnabled) { - producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); - } else { - consumer.commitSync(consumedOffsetsAndMetadata); - } - commitOffsetNeeded = false; - } - - if (eosEnabled) { - producer.commitTransaction(); - transactionInFlight = false; - if (startNewTransaction) { - producer.beginTransaction(); - transactionInFlight = true; - } - } - } catch (final CommitFailedException | ProducerFencedException fatal) { - throw new TaskMigratedException(this, fatal); - } - } - Map<TopicPartition, Long> purgableOffsets() { final Map<TopicPartition, Long> purgableConsumedOffsets = new HashMap<>(); for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { @@ -645,13 +640,12 @@ private void closeTopology() { // helper to avoid calling suspend() twice if a suspended task is not reassigned and closed @Override - public void closeSuspended(boolean clean, + public void closeSuspended(final boolean clean, final boolean isZombie, RuntimeException firstException) { try { closeStateManager(clean); } catch (final RuntimeException e) { - clean = false; if (firstException == null) { firstException = e; } @@ -793,10 +787,16 @@ public boolean maybePunctuateStreamTime() { // if the timestamp is not known yet, meaning there is not enough data accumulated // to reason stream partition time, then skip. - if (timestamp == RecordQueue.NOT_KNOWN) { + if (timestamp == RecordQueue.UNKNOWN) { return false; } else { - return streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); + final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); + + if (punctuated) { + commitNeeded = true; + } + + return punctuated; } } @@ -810,20 +810,26 @@ public boolean maybePunctuateStreamTime() { public boolean maybePunctuateSystemTime() { final long timestamp = time.milliseconds(); - return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this); + final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this); + + if (punctuated) { + commitNeeded = true; + } + + return punctuated; } /** * Request committing the current task's state */ - void needCommit() { + void requestCommit() { commitRequested = true; } /** * Whether or not a request has been made to commit the current state */ - boolean commitNeeded() { + boolean commitRequested() { return commitRequested; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 28cedbe4197..b43177d9d4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Total; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; @@ -68,7 +69,6 @@ public class StreamThread extends Thread { - private final static int UNLIMITED_RECORDS = -1; private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1); /** @@ -554,18 +554,21 @@ StandbyTask createTask(final Consumer<byte[], byte[]> consumer, } private final Time time; - private final Duration pollTime; - private final long commitTimeMs; - private final Object stateLock; private final Logger log; private final String logPrefix; + private final Object stateLock; + private final Duration pollTime; + private final long commitTimeMs; + private final int maxPollTimeMs; + private final String originalReset; private final TaskManager taskManager; private final StreamsMetricsThreadImpl streamsMetrics; private final AtomicInteger assignmentErrorCode; + private long now; + private long lastPollMs; private long lastCommitMs; - private long timerStartedMs; - private final String originalReset; + private int numIterations; private Throwable rebalanceException = null; private boolean processStandbyRecords = false; private volatile State state = State.CREATED; @@ -711,11 +714,21 @@ public StreamThread(final Time time, this.assignmentErrorCode = assignmentErrorCode; this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); + this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId")) + .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); + this.numIterations = 1; + updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); } + private static final class InternalConsumerConfig extends ConsumerConfig { + private InternalConsumerConfig(final Map<String, Object> props) { + super(ConsumerConfig.addDeserializerToConfig(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()), false); + } + } + /** * Execute the stream processors * @@ -757,12 +770,11 @@ private void setRebalanceException(final Throwable rebalanceException) { * @throws StreamsException if the store's change log does not contain the partition */ private void runLoop() { - long recordsProcessedBeforeCommit = UNLIMITED_RECORDS; consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); while (isRunning()) { try { - recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); + runOnce(); if (assignmentErrorCode.get() == StreamsPartitionAssignor.Error.VERSION_PROBING.code()) { log.info("Version probing detected. Triggering new rebalance."); enforceRebalance(); @@ -791,12 +803,10 @@ private void enforceRebalance() { * or if the task producer got fenced (EOS) */ // Visible for testing - long runOnce(final long recordsProcessedBeforeCommit) { - long processedBeforeCommit = recordsProcessedBeforeCommit; - + void runOnce() { final ConsumerRecords<byte[], byte[]> records; - timerStartedMs = time.milliseconds(); + now = time.milliseconds(); if (state == State.PARTITIONS_ASSIGNED) { // try to fetch some records with zero poll millis @@ -816,6 +826,13 @@ long runOnce(final long recordsProcessedBeforeCommit) { throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration"); } + final long pollLatency = advanceNowAndComputeLatency(); + + if (records != null && !records.isEmpty()) { + streamsMetrics.pollTimeSensor.record(pollLatency, now); + addRecordsToTasks(records); + } + // only try to initialize the assigned tasks // if the state is still in PARTITION_ASSIGNED after the poll call if (state == State.PARTITIONS_ASSIGNED) { @@ -824,28 +841,60 @@ long runOnce(final long recordsProcessedBeforeCommit) { } } - if (records != null && !records.isEmpty()) { - streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); - addRecordsToTasks(records); - } + advanceNowAndComputeLatency(); + // TODO: we will process some tasks even if the state is not RUNNING, i.e. some other + // tasks are still being restored. if (taskManager.hasActiveRunningTasks()) { - final long totalProcessed = processAndMaybeCommit(recordsProcessedBeforeCommit); - if (totalProcessed > 0) { - final long processLatency = computeLatency(); - streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed, timerStartedMs); - processedBeforeCommit = adjustRecordsProcessedBeforeCommit( - recordsProcessedBeforeCommit, - totalProcessed, - processLatency, - commitTimeMs); - } + /* + * Within an iteration, after N (N initialized as 1 upon start up) round of processing one-record-each on the applicable tasks, check the current time: + * 1. If it is time to commit, do it; + * 2. If it is time to punctuate, do it; + * 3. If elapsed time is close to consumer's max.poll.interval.ms, end the current iteration immediately. + * 4. If none of the the above happens, increment N. + * 5. If one of the above happens, half the value of N. + */ + int processed = 0; + long timeSinceLastPoll = 0L; + + do { + for (int i = 0; i < numIterations; i++) { + processed = taskManager.process(now); + + if (processed > 0) { + final long processLatency = advanceNowAndComputeLatency(); + streamsMetrics.processTimeSensor.record(processLatency / (double) processed, now); + + // commit any tasks that have requested a commit + final int committed = taskManager.maybeCommitActiveTasksPerUserRequested(); + + if (committed > 0) { + final long commitLatency = advanceNowAndComputeLatency(); + streamsMetrics.commitTimeSensor.record(commitLatency / (double) committed, now); + } + } else { + // if there is no records to be processed, exit immediately + break; + } + } + + timeSinceLastPoll = Math.max(now - lastPollMs, 0); + + if (maybePunctuate() || maybeCommit()) { + numIterations = numIterations > 1 ? numIterations / 2 : numIterations; + } else if (timeSinceLastPoll > maxPollTimeMs / 2) { + numIterations = numIterations > 1 ? numIterations / 2 : numIterations; + break; + } else if (processed > 0) { + numIterations++; + } + } while (processed > 0); } - punctuate(); - maybeCommit(timerStartedMs); - maybeUpdateStandbyTasks(timerStartedMs); - return processedBeforeCommit; + // update standby tasks and maybe commit the standby tasks as well + maybeUpdateStandbyTasks(); + + maybeCommit(); } /** @@ -858,6 +907,8 @@ long runOnce(final long recordsProcessedBeforeCommit) { private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) { ConsumerRecords<byte[], byte[]> records = null; + lastPollMs = now; + try { records = consumer.poll(pollTime); } catch (final InvalidOffsetException e) { @@ -938,120 +989,65 @@ private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) { } } - /** - * Schedule the records processing by selecting which record is processed next. Commits may - * happen as records are processed. - * - * @param recordsProcessedBeforeCommit number of records to be processed before commit is called. - * if UNLIMITED_RECORDS, then commit is never called - * @return Number of records processed since last commit. - * @throws TaskMigratedException if committing offsets failed (non-EOS) - * or if the task producer got fenced (EOS) - */ - private long processAndMaybeCommit(final long recordsProcessedBeforeCommit) { - - long processed; - long totalProcessedSinceLastMaybeCommit = 0; - // Round-robin scheduling by taking one record from each task repeatedly - // until no task has any records left - do { - processed = taskManager.process(); - if (processed > 0) { - streamsMetrics.processTimeSensor.record(computeLatency() / (double) processed, timerStartedMs); - } - totalProcessedSinceLastMaybeCommit += processed; - - punctuate(); - - if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS && - totalProcessedSinceLastMaybeCommit >= recordsProcessedBeforeCommit) { - totalProcessedSinceLastMaybeCommit = 0; - maybeCommit(timerStartedMs); - } - // commit any tasks that have requested a commit - final int committed = taskManager.maybeCommitActiveTasks(); - if (committed > 0) { - streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, timerStartedMs); - } - } while (processed != 0); - - return totalProcessedSinceLastMaybeCommit; - } - /** * @throws TaskMigratedException if the task producer got fenced (EOS only) */ - private void punctuate() { + private boolean maybePunctuate() { final int punctuated = taskManager.punctuate(); if (punctuated > 0) { - streamsMetrics.punctuateTimeSensor.record(computeLatency() / (double) punctuated, timerStartedMs); + final long punctuateLatency = advanceNowAndComputeLatency(); + streamsMetrics.punctuateTimeSensor.record(punctuateLatency / (double) punctuated, now); } - } - /** - * Adjust the number of records that should be processed by scheduler. This avoids - * scenarios where the processing time is higher than the commit time. - * - * @param prevRecordsProcessedBeforeCommit Previous number of records processed by scheduler. - * @param totalProcessed Total number of records processed in this last round. - * @param processLatency Total processing latency in ms processed in this last round. - * @param commitTime Desired commit time in ms. - * @return An adjusted number of records to be processed in the next round. - */ - private long adjustRecordsProcessedBeforeCommit(final long prevRecordsProcessedBeforeCommit, final long totalProcessed, - final long processLatency, final long commitTime) { - long recordsProcessedBeforeCommit = UNLIMITED_RECORDS; - // check if process latency larger than commit latency - // note that once we set recordsProcessedBeforeCommit, it will never be UNLIMITED_RECORDS again, so - // we will never process all records again. This might be an issue if the initial measurement - // was off due to a slow start. - if (processLatency > 0 && processLatency > commitTime) { - // push down - recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency); - log.debug("processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}", - processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); - } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency > 0) { - // push up - recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency); - log.debug("processing latency {} < commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}", - processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); - } - - return recordsProcessedBeforeCommit; + return punctuated > 0; } /** - * Commit all tasks owned by this thread if specified interval time has elapsed + * Try to commit all active tasks owned by this thread. + * + * Visible for testing. * * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) */ - void maybeCommit(final long now) { - if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { + boolean maybeCommit() { + int committed = 0; + + if (commitTimeMs >= 0 && now - lastCommitMs > commitTimeMs) { if (log.isTraceEnabled()) { log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs); } - final int committed = taskManager.commitAll(); + committed += taskManager.commitAll(); if (committed > 0) { - streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, timerStartedMs); + final long intervalCommitLatency = advanceNowAndComputeLatency(); + streamsMetrics.commitTimeSensor.record(intervalCommitLatency / (double) committed, now); // try to purge the committed records for repartition topics if possible taskManager.maybePurgeCommitedRecords(); - } - if (log.isDebugEnabled()) { - log.debug("Committed all active tasks {} and standby tasks {} in {}ms", - taskManager.activeTaskIds(), taskManager.standbyTaskIds(), timerStartedMs - now); + + if (log.isDebugEnabled()) { + log.debug("Committed all active tasks {} and standby tasks {} in {}ms", + taskManager.activeTaskIds(), taskManager.standbyTaskIds(), intervalCommitLatency); + } } lastCommitMs = now; - processStandbyRecords = true; + } else { + final int commitPerRequested = taskManager.maybeCommitActiveTasksPerUserRequested(); + if (commitPerRequested > 0) { + final long requestCommitLatency = advanceNowAndComputeLatency(); + streamsMetrics.commitTimeSensor.record(requestCommitLatency / (double) committed, now); + committed += commitPerRequested; + } } + + return committed > 0; } - private void maybeUpdateStandbyTasks(final long now) { + private void maybeUpdateStandbyTasks() { if (state == State.RUNNING && taskManager.hasStandbyRunningTasks()) { if (processStandbyRecords) { if (!standbyRecords.isEmpty()) { @@ -1080,7 +1076,9 @@ private void maybeUpdateStandbyTasks(final long now) { standbyRecords = remainingStandbyRecords; - log.debug("Updated standby tasks {} in {}ms", taskManager.standbyTaskIds(), time.milliseconds() - now); + if (log.isDebugEnabled()) { + log.debug("Updated standby tasks {} in {}ms", taskManager.standbyTaskIds(), time.milliseconds() - now); + } } processStandbyRecords = false; } @@ -1130,6 +1128,9 @@ private void maybeUpdateStandbyTasks(final long now) { } restoreConsumer.seekToBeginning(partitions); } + + // update now if the standby restoration indeed executed + advanceNowAndComputeLatency(); } } @@ -1139,11 +1140,11 @@ private void maybeUpdateStandbyTasks(final long now) { * * @return latency */ - private long computeLatency() { - final long previousTimeMs = timerStartedMs; - timerStartedMs = time.milliseconds(); + private long advanceNowAndComputeLatency() { + final long previous = now; + now = time.milliseconds(); - return Math.max(timerStartedMs - previousTimeMs, 0); + return Math.max(now - previous, 0); } /** @@ -1242,6 +1243,10 @@ public String toString(final String indent) { } // the following are for testing only + void setNow(final long now) { + this.now = now; + } + TaskManager taskManager() { return taskManager; } @@ -1250,6 +1255,10 @@ TaskManager taskManager() { return standbyRecords; } + int currentNumIterations() { + return numIterations; + } + public Map<MetricName, Metric> producerMetrics() { final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>(); if (producer != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 2b43640f00e..812e7e1131c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -34,6 +34,8 @@ */ boolean initializeStateStores(); + boolean commitNeeded(); + void initializeTopology(); void commit(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index e25133737bd..9cc5a19e017 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -409,8 +409,8 @@ int commitAll() { /** * @throws TaskMigratedException if the task producer got fenced (EOS only) */ - int process() { - return active.process(); + int process(final long now) { + return active.process(now); } /** @@ -424,8 +424,8 @@ int punctuate() { * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) */ - int maybeCommitActiveTasks() { - return active.maybeCommit(); + int maybeCommitActiveTasksPerUserRequested() { + return active.maybeCommitPerUserRequested(); } void maybePurgeCommitedRecords() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 4593e59a54d..b51511e19e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -24,14 +24,13 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -40,9 +39,6 @@ import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; @@ -54,14 +50,9 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Properties; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - /** * Similar to KStreamAggregationIntegrationTest but with dedupping enabled * by virtue of having a large commit interval @@ -93,11 +84,9 @@ public void before() throws InterruptedException { builder = new StreamsBuilder(); createTopics(); streamsConfiguration = new Properties(); - final String applicationId = "kgrouped-stream-test-" + - testNo; + final String applicationId = "kgrouped-stream-test-" + testNo; streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); @@ -111,12 +100,7 @@ public void before() throws InterruptedException { mapper, Serialized.with(Serdes.String(), Serdes.String())); - reducer = new Reducer<String>() { - @Override - public String apply(final String value1, final String value2) { - return value1 + ":" + value2; - } - }; + reducer = (value1, value2) -> value1 + ":" + value2; } @After @@ -132,7 +116,7 @@ public void whenShuttingDown() throws IOException { public void shouldReduce() throws Exception { produceMessages(System.currentTimeMillis()); groupedStream - .reduce(reducer, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce-by-key")) + .reduce(reducer, Materialized.as("reduce-by-key")) .toStream() .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); @@ -140,34 +124,15 @@ public void shouldReduce() throws Exception { produceMessages(System.currentTimeMillis()); - final List<KeyValue<String, String>> results = receiveMessages( - new StringDeserializer(), - new StringDeserializer(), - 5); - - Collections.sort(results, new Comparator<KeyValue<String, String>>() { - @Override - public int compare(final KeyValue<String, String> o1, final KeyValue<String, String> o2) { - return KStreamAggregationDedupIntegrationTest.compare(o1, o2); - } - }); - - assertThat(results, is(Arrays.asList( - KeyValue.pair("A", "A:A"), - KeyValue.pair("B", "B:B"), - KeyValue.pair("C", "C:C"), - KeyValue.pair("D", "D:D"), - KeyValue.pair("E", "E:E")))); - } - - @SuppressWarnings("unchecked") - private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1, - final KeyValue<K, V> o2) { - final int keyComparison = o1.key.compareTo(o2.key); - if (keyComparison == 0) { - return o1.value.compareTo(o2.value); - } - return keyComparison; + validateReceivedMessages( + new StringDeserializer(), + new StringDeserializer(), + Arrays.asList( + KeyValue.pair("A", "A:A"), + KeyValue.pair("B", "B:B"), + KeyValue.pair("C", "C:C"), + KeyValue.pair("D", "D:D"), + KeyValue.pair("E", "E:E"))); } @Test @@ -180,50 +145,31 @@ public void shouldReduceWindowed() throws Exception { groupedStream .windowedBy(TimeWindows.of(500L)) - .reduce(reducer, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduce-time-windows")) - .toStream(new KeyValueMapper<Windowed<String>, String, String>() { - @Override - public String apply(final Windowed<String> windowedKey, final String value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }) + .reduce(reducer, Materialized.as("reduce-time-windows")) + .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()) .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); startStreams(); - final List<KeyValue<String, String>> windowedOutput = receiveMessages( - new StringDeserializer(), - new StringDeserializer(), - 10); - - final Comparator<KeyValue<String, String>> - comparator = - new Comparator<KeyValue<String, String>>() { - @Override - public int compare(final KeyValue<String, String> o1, - final KeyValue<String, String> o2) { - return KStreamAggregationDedupIntegrationTest.compare(o1, o2); - } - }; - - Collections.sort(windowedOutput, comparator); final long firstBatchWindow = firstBatchTimestamp / 500 * 500; final long secondBatchWindow = secondBatchTimestamp / 500 * 500; - assertThat(windowedOutput, is( - Arrays.asList( - new KeyValue<>("A@" + firstBatchWindow, "A"), - new KeyValue<>("A@" + secondBatchWindow, "A:A"), - new KeyValue<>("B@" + firstBatchWindow, "B"), - new KeyValue<>("B@" + secondBatchWindow, "B:B"), - new KeyValue<>("C@" + firstBatchWindow, "C"), - new KeyValue<>("C@" + secondBatchWindow, "C:C"), - new KeyValue<>("D@" + firstBatchWindow, "D"), - new KeyValue<>("D@" + secondBatchWindow, "D:D"), - new KeyValue<>("E@" + firstBatchWindow, "E"), - new KeyValue<>("E@" + secondBatchWindow, "E:E") - ) - )); + validateReceivedMessages( + new StringDeserializer(), + new StringDeserializer(), + Arrays.asList( + new KeyValue<>("A@" + firstBatchWindow, "A"), + new KeyValue<>("A@" + secondBatchWindow, "A:A"), + new KeyValue<>("B@" + firstBatchWindow, "B"), + new KeyValue<>("B@" + secondBatchWindow, "B:B"), + new KeyValue<>("C@" + firstBatchWindow, "C"), + new KeyValue<>("C@" + secondBatchWindow, "C:C"), + new KeyValue<>("D@" + firstBatchWindow, "D"), + new KeyValue<>("D@" + secondBatchWindow, "D:D"), + new KeyValue<>("E@" + firstBatchWindow, "E"), + new KeyValue<>("E@" + secondBatchWindow, "E:E") + ) + ); } @Test @@ -234,36 +180,25 @@ public void shouldGroupByKey() throws Exception { stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String())) .windowedBy(TimeWindows.of(500L)) - .count(Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as("count-windows")) - .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() { - @Override - public String apply(final Windowed<Integer> windowedKey, final Long value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); + .count(Materialized.as("count-windows")) + .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()) + .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); startStreams(); - final List<KeyValue<String, Long>> results = receiveMessages( - new StringDeserializer(), - new LongDeserializer(), - 5); - Collections.sort(results, new Comparator<KeyValue<String, Long>>() { - @Override - public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { - return KStreamAggregationDedupIntegrationTest.compare(o1, o2); - } - }); - final long window = timestamp / 500 * 500; - assertThat(results, is(Arrays.asList( - KeyValue.pair("1@" + window, 2L), - KeyValue.pair("2@" + window, 2L), - KeyValue.pair("3@" + window, 2L), - KeyValue.pair("4@" + window, 2L), - KeyValue.pair("5@" + window, 2L) - ))); + validateReceivedMessages( + new StringDeserializer(), + new LongDeserializer(), + Arrays.asList( + KeyValue.pair("1@" + window, 2L), + KeyValue.pair("2@" + window, 2L), + KeyValue.pair("3@" + window, 2L), + KeyValue.pair("4@" + window, 2L), + KeyValue.pair("5@" + window, 2L) + ) + ); } @@ -298,11 +233,9 @@ private void startStreams() { } - private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> - keyDeserializer, - final Deserializer<V> - valueDeserializer, - final int numMessages) + private <K, V> void validateReceivedMessages(final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer, + final List<KeyValue<K, V>> expectedRecords) throws InterruptedException { final Properties consumerProperties = new Properties(); consumerProperties @@ -314,11 +247,11 @@ private void startStreams() { keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); - return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, - outputTopic, - numMessages, - 60 * 1000); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + consumerProperties, + outputTopic, + expectedRecords); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 496ba5842a9..97d1071aaa2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -524,20 +524,24 @@ public boolean test(final String key, final Long value) { myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.<String, Long>keyValueStore()); for (final KeyValue<String, Long> expectedEntry : expectedBatch1) { - assertEquals(myFilterStore.get(expectedEntry.key), expectedEntry.value); + TestUtils.waitForCondition(() -> expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)), + "Cannot get expected result"); } for (final KeyValue<String, Long> batchEntry : batch1) { if (!expectedBatch1.contains(batchEntry)) { - assertNull(myFilterStore.get(batchEntry.key)); + TestUtils.waitForCondition(() -> myFilterStore.get(batchEntry.key) == null, + "Cannot get null result"); } } for (final KeyValue<String, Long> expectedEntry : expectedBatch1) { - assertNull(myFilterNotStore.get(expectedEntry.key)); + TestUtils.waitForCondition(() -> myFilterNotStore.get(expectedEntry.key) == null, + "Cannot get null result"); } for (final KeyValue<String, Long> batchEntry : batch1) { if (!expectedBatch1.contains(batchEntry)) { - assertEquals(myFilterNotStore.get(batchEntry.key), batchEntry.value); + TestUtils.waitForCondition(() -> batchEntry.value.equals(myFilterNotStore.get(batchEntry.key)), + "Cannot get expected result"); } } } @@ -568,24 +572,25 @@ public void shouldBeAbleToQueryMapValuesState() throws Exception { mockTime); final KTable<String, String> t1 = builder.table(streamOne); - final KTable<String, Long> t2 = t1.mapValues(new ValueMapper<String, Long>() { + t1.mapValues(new ValueMapper<String, Long>() { @Override public Long apply(final String value) { return Long.valueOf(value); } - }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long())); - t2.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); + }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long())) + .toStream() + .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); kafkaStreams.start(); - waitUntilAtLeastNumRecordProcessed(outputTopic, 1); + waitUntilAtLeastNumRecordProcessed(outputTopic, 5); final ReadOnlyKeyValueStore<String, Long> myMapStore = kafkaStreams.store("queryMapValues", QueryableStoreTypes.<String, Long>keyValueStore()); for (final KeyValue<String, String> batchEntry : batch1) { - assertEquals(myMapStore.get(batchEntry.key), Long.valueOf(batchEntry.value)); + assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index d9602f35304..985b57f4cd5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -46,7 +46,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -369,19 +371,22 @@ public static void waitForCompletion(final KafkaStreams streams, readKeyValues(topic, consumer, waitTime, expectedRecords.size()); accumData.addAll(readData); - final int accumLastIndex = accumData.size() - 1; - final int expectedLastIndex = expectedRecords.size() - 1; - // filter out all intermediate records we don't want final List<KeyValue<K, V>> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList()); - // need this check as filtering above could have removed the last record from accumData, but it did not - // equal the last expected record - final boolean lastRecordsMatch = accumData.get(accumLastIndex).equals(expectedRecords.get(expectedLastIndex)); + // still need to check that for each key, the ordering is expected + final Map<K, List<KeyValue<K, V>>> finalAccumData = new HashMap<>(); + for (final KeyValue<K, V> kv : accumulatedActual) { + finalAccumData.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv); + } + final Map<K, List<KeyValue<K, V>>> finalExpected = new HashMap<>(); + for (final KeyValue<K, V> kv : expectedRecords) { + finalExpected.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv); + } // returns true only if the remaining records in both lists are the same and in the same order // and the last record received matches the last expected record - return accumulatedActual.equals(expectedRecords) && lastRecordsMatch; + return finalAccumData.equals(finalExpected); }; final String conditionDetails = "Did not receive all " + expectedRecords + " records from topic " + topic; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index a3c47b5bdc2..fe71135f223 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -61,7 +61,7 @@ public void before() { public void shouldInitializeNewTasks() { EasyMock.expect(t1.initializeStateStores()).andReturn(false); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); - EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); + EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()); EasyMock.replay(t1); addAndInitTask(); @@ -75,13 +75,13 @@ public void shouldMoveInitializedTasksNeedingRestoreToRestoring() { t1.initializeTopology(); EasyMock.expectLastCall().once(); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); - EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); + EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()); EasyMock.expect(t2.initializeStateStores()).andReturn(true); t2.initializeTopology(); EasyMock.expectLastCall().once(); final Set<TopicPartition> t2partitions = Collections.singleton(tp2); EasyMock.expect(t2.partitions()).andReturn(t2partitions); - EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); + EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()); EasyMock.replay(t1, t2); @@ -101,7 +101,7 @@ public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() { t2.initializeTopology(); EasyMock.expectLastCall().once(); EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)); - EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); + EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()); EasyMock.replay(t2); @@ -145,7 +145,7 @@ public void shouldSuspendRunningTasks() { public void shouldCloseRestoringTasks() { EasyMock.expect(t1.initializeStateStores()).andReturn(false); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); - EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); + EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptySet()); t1.close(false, false); EasyMock.expectLastCall(); EasyMock.replay(t1); @@ -247,12 +247,13 @@ private void mockTaskInitialization() { t1.initializeTopology(); EasyMock.expectLastCall().once(); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); - EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); + EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList()); } @Test public void shouldCommitRunningTasks() { mockTaskInitialization(); + EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall(); EasyMock.replay(t1); @@ -266,6 +267,7 @@ public void shouldCommitRunningTasks() { @Test public void shouldCloseTaskOnCommitIfTaskMigratedException() { mockTaskInitialization(); + EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall().andThrow(new TaskMigratedException()); t1.close(false, true); @@ -285,6 +287,7 @@ public void shouldCloseTaskOnCommitIfTaskMigratedException() { @Test public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() { mockTaskInitialization(); + EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall().andThrow(new RuntimeException("")); EasyMock.replay(t1); @@ -303,6 +306,7 @@ public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() { @Test public void shouldCommitRunningTasksIfNeeded() { mockTaskInitialization(); + EasyMock.expect(t1.commitRequested()).andReturn(true); EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall(); @@ -310,13 +314,14 @@ public void shouldCommitRunningTasksIfNeeded() { addAndInitTask(); - assertThat(assignedTasks.maybeCommit(), equalTo(1)); + assertThat(assignedTasks.maybeCommitPerUserRequested(), equalTo(1)); EasyMock.verify(t1); } @Test public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() { mockTaskInitialization(); + EasyMock.expect(t1.commitRequested()).andReturn(true); EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall().andThrow(new TaskMigratedException()); @@ -326,7 +331,7 @@ public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() { addAndInitTask(); try { - assignedTasks.maybeCommit(); + assignedTasks.maybeCommitPerUserRequested(); fail("Should have thrown TaskMigratedException."); } catch (final TaskMigratedException expected) { /* ignore */ } @@ -337,7 +342,7 @@ public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() { @Test public void shouldCloseTaskOnProcessesIfTaskMigratedException() { mockTaskInitialization(); - EasyMock.expect(t1.isProcessable()).andReturn(true); + EasyMock.expect(t1.isProcessable(0L)).andReturn(true); t1.process(); EasyMock.expectLastCall().andThrow(new TaskMigratedException()); t1.close(false, true); @@ -346,7 +351,7 @@ public void shouldCloseTaskOnProcessesIfTaskMigratedException() { addAndInitTask(); try { - assignedTasks.process(); + assignedTasks.process(0L); fail("Should have thrown TaskMigratedException."); } catch (final TaskMigratedException expected) { /* ignore */ } @@ -357,11 +362,11 @@ public void shouldCloseTaskOnProcessesIfTaskMigratedException() { @Test public void shouldNotProcessUnprocessableTasks() { mockTaskInitialization(); - EasyMock.expect(t1.isProcessable()).andReturn(false); + EasyMock.expect(t1.isProcessable(0L)).andReturn(false); EasyMock.replay(t1); addAndInitTask(); - assertThat(assignedTasks.process(), equalTo(0)); + assertThat(assignedTasks.process(0L), equalTo(0)); EasyMock.verify(t1); } @@ -369,13 +374,14 @@ public void shouldNotProcessUnprocessableTasks() { @Test public void shouldAlwaysProcessProcessableTasks() { mockTaskInitialization(); - EasyMock.expect(t1.isProcessable()).andReturn(true); + EasyMock.expect(t1.isProcessable(0L)).andReturn(true); EasyMock.expect(t1.process()).andReturn(true).once(); + EasyMock.replay(t1); addAndInitTask(); - assertThat(assignedTasks.process(), equalTo(1)); + assertThat(assignedTasks.process(0L), equalTo(1)); EasyMock.verify(t1); } @@ -459,7 +465,7 @@ private void mockRunningTaskSuspension() { EasyMock.expectLastCall().once(); EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes(); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes(); - EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes(); + EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes(); t1.suspend(); EasyMock.expectLastCall(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index a8cd2c8f357..b91aba5fdd6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -101,7 +101,7 @@ public void testTimeTracking() { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); // add three 3 out-of-order records with timestamp 2, 1, 3 final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( @@ -173,7 +173,7 @@ public void testTimeTracking() { queue.clear(); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.NOT_KNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); // re-insert the three records with 4, 5, 6 queue.addRawRecords(list3); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 848cd4f0bf8..834ab3e1ae2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -21,16 +21,17 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -82,9 +83,9 @@ public class StreamTaskTest { - private final Serializer<Integer> intSerializer = new IntegerSerializer(); - private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); - private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); + private final Serializer<Integer> intSerializer = Serdes.Integer().serializer(); + private final Serializer<byte[]> bytesSerializer = Serdes.ByteArray().serializer(); + private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer(); private final String topic1 = "topic1"; private final String topic2 = "topic2"; private final TopicPartition partition1 = new TopicPartition(topic1, 1); @@ -113,7 +114,7 @@ public void close() { private final Long offset = 543L; private final ProcessorTopology topology = ProcessorTopology.withSources( - Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime), + Utils.mkList(source1, source2, processorStreamTime, processorSystemTime), mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)) ); @@ -129,8 +130,7 @@ public void close() { }; private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); - private final Metrics metrics = new Metrics(); - private final Sensor skippedRecordsSensor = metrics.sensor("skipped-records"); + private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG)); private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics); private final TaskId taskId00 = new TaskId(0, 0); private final MockTime time = new MockTime(); @@ -159,7 +159,8 @@ private StreamsConfig createConfig(final boolean enableEoS) { mkEntry(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"), mkEntry(StreamsConfig.STATE_DIR_CONFIG, canonicalPath), mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()), - mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? StreamsConfig.EXACTLY_ONCE : StreamsConfig.AT_LEAST_ONCE) + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? StreamsConfig.EXACTLY_ONCE : StreamsConfig.AT_LEAST_ONCE), + mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "100") ))); } @@ -393,9 +394,6 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { assertEquals(4, source2.numReceived); assertFalse(task.maybePunctuateStreamTime()); - assertFalse(task.process()); - assertFalse(task.maybePunctuateStreamTime()); - processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L); } @@ -453,23 +451,62 @@ public void shouldRespectPunctuateCancellationSystemTime() { processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10); } + @Test + public void shouldRespectCommitNeeded() { + task = createStatelessTask(createConfig(false)); + task.initializeStateStores(); + task.initializeTopology(); + + assertFalse(task.commitNeeded()); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0))); + assertTrue(task.process()); + assertTrue(task.commitNeeded()); + + task.commit(); + assertFalse(task.commitNeeded()); + + assertTrue(task.maybePunctuateStreamTime()); + assertTrue(task.commitNeeded()); + + task.commit(); + assertFalse(task.commitNeeded()); + + time.sleep(10); + assertTrue(task.maybePunctuateSystemTime()); + assertTrue(task.commitNeeded()); + + task.commit(); + assertFalse(task.commitNeeded()); + } + + @Test + public void shouldRespectCommitRequested() { + task = createStatelessTask(createConfig(false)); + task.initializeStateStores(); + task.initializeTopology(); + + task.requestCommit(); + assertTrue(task.commitRequested()); + } + @Test public void shouldBeProcessableIfAllPartitionsBuffered() { task = createStatelessTask(createConfig(false)); task.initializeStateStores(); task.initializeTopology(); - assertFalse(task.isProcessable()); + assertFalse(task.isProcessable(0L)); final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); - assertFalse(task.isProcessable()); + assertFalse(task.isProcessable(0L)); task.addRecords(partition2, Collections.singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes))); - assertTrue(task.isProcessable()); + assertTrue(task.isProcessable(0L)); } @Test @@ -478,19 +515,42 @@ public void shouldBeProcessableIfWaitedForTooLong() { task.initializeStateStores(); task.initializeTopology(); - assertFalse(task.isProcessable()); + final MetricName enforcedProcessMetric = metrics.metricName("enforced-processing-total", "stream-task-metrics", mkMap(mkEntry("client-id", "test"), mkEntry("task-id", taskId00.toString()))); + + assertFalse(task.isProcessable(0L)); + assertEquals(0.0, metrics.metric(enforcedProcessMetric).metricValue()); final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); - assertFalse(task.isProcessable()); - assertFalse(task.isProcessable()); - assertFalse(task.isProcessable()); - assertFalse(task.isProcessable()); - assertFalse(task.isProcessable()); + assertFalse(task.isProcessable(time.milliseconds())); + + assertFalse(task.isProcessable(time.milliseconds() + 50L)); + + assertTrue(task.isProcessable(time.milliseconds() + 100L)); + assertEquals(1.0, metrics.metric(enforcedProcessMetric).metricValue()); + + // once decided to enforce, continue doing that + assertTrue(task.isProcessable(time.milliseconds() + 101L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); + + task.addRecords(partition2, Collections.singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes))); + + assertTrue(task.isProcessable(time.milliseconds() + 130L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); + + // one resumed to normal processing, the timer should be reset + task.process(); + + assertFalse(task.isProcessable(time.milliseconds() + 150L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); + + assertFalse(task.isProcessable(time.milliseconds() + 249L)); + assertEquals(2.0, metrics.metric(enforcedProcessMetric).metricValue()); - assertTrue(task.isProcessable()); + assertTrue(task.isProcessable(time.milliseconds() + 250L)); + assertEquals(3.0, metrics.metric(enforcedProcessMetric).metricValue()); } @@ -1155,8 +1215,8 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { final TopicPartition repartition = new TopicPartition("repartition", 1); final ProcessorTopology topology = ProcessorTopology.withRepartitionTopics( - Utils.<ProcessorNode>mkList(source1, source2), - mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(repartition.topic(), (SourceNode) source2)), + Utils.mkList(source1, source2), + mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)), Collections.singleton(repartition.topic()) ); consumer.assign(Arrays.asList(partition1, repartition)); @@ -1227,10 +1287,10 @@ public void punctuate(final long timestamp) { private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) { final ProcessorTopology topology = ProcessorTopology.with( - Utils.<ProcessorNode>mkList(source1, source2), - mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)), + Utils.mkList(source1, source2), + mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)), singletonList(stateStore), - logged ? Collections.singletonMap(storeName, storeName + "-changelog") : Collections.<String, String>emptyMap()); + logged ? Collections.singletonMap(storeName, storeName + "-changelog") : Collections.emptyMap()); return new StreamTask( taskId00, @@ -1249,10 +1309,10 @@ private StreamTask createStatefulTask(final StreamsConfig config, final boolean private StreamTask createStatefulTaskThatThrowsExceptionOnClose() { final ProcessorTopology topology = ProcessorTopology.with( - Utils.<ProcessorNode>mkList(source1, source3), - mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source3)), + Utils.mkList(source1, source3), + mkMap(mkEntry(topic1, source1), mkEntry(topic2, source3)), singletonList(stateStore), - Collections.<String, String>emptyMap()); + Collections.emptyMap()); return new StreamTask( taskId00, @@ -1271,7 +1331,7 @@ private StreamTask createStatefulTaskThatThrowsExceptionOnClose() { private StreamTask createStatelessTask(final StreamsConfig streamsConfig) { final ProcessorTopology topology = ProcessorTopology.withSources( - Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime), + Utils.mkList(source1, source2, processorStreamTime, processorSystemTime), mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)) ); @@ -1298,8 +1358,8 @@ private StreamTask createStatelessTask(final StreamsConfig streamsConfig) { // this task will throw exception when processing (on partition2), flushing, suspending and closing private StreamTask createTaskThatThrowsException(final boolean enableEos) { final ProcessorTopology topology = ProcessorTopology.withSources( - Utils.<ProcessorNode>mkList(source1, source3, processorStreamTime, processorSystemTime), - mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source3)) + Utils.mkList(source1, source3, processorStreamTime, processorSystemTime), + mkMap(mkEntry(topic1, source1), mkEntry(topic2, source3)) ); source1.addChild(processorStreamTime); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index c1485fb8056..e691c54d8eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -61,8 +61,10 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.MockTimestampExtractor; +import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; @@ -106,15 +108,16 @@ private final MockTime mockTime = new MockTime(); private final Metrics metrics = new Metrics(); private final MockClientSupplier clientSupplier = new MockClientSupplier(); - private UUID processId = UUID.randomUUID(); private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder()); - private InternalTopologyBuilder internalTopologyBuilder; private final StreamsConfig config = new StreamsConfig(configProps(false)); private final String stateDir = TestUtils.tempDirectory().getPath(); private final StateDirectory stateDirectory = new StateDirectory(config, mockTime); - private StreamsMetadataState streamsMetadataState; private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>(); + private UUID processId = UUID.randomUUID(); + private InternalTopologyBuilder internalTopologyBuilder; + private StreamsMetadataState streamsMetadataState; + @Before public void setUp() { processId = UUID.randomUUID(); @@ -177,7 +180,7 @@ public void testPartitionAssignmentChangeForSingleGroup() { mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); assertEquals(thread.state(), StreamThread.State.RUNNING); Assert.assertEquals(4, stateListener.numChanges); Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState); @@ -307,13 +310,106 @@ public void shouldNotCommitBeforeTheCommitInterval() { new LogContext(""), new AtomicInteger() ); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); mockTime.sleep(commitInterval - 10L); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); EasyMock.verify(taskManager); } + @Test + public void shouldRespectNumIterationsInMainLoop() { + final MockProcessor mockProcessor = new MockProcessor(PunctuationType.WALL_CLOCK_TIME, 10L); + internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); + internalTopologyBuilder.addProcessor("processor1", () -> mockProcessor, "source1"); + internalTopologyBuilder.addProcessor("processor2", () -> new MockProcessor(PunctuationType.STREAM_TIME, 10L), "source1"); + + final Properties properties = new Properties(); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig(applicationId, + "localhost:2171", + Serdes.ByteArraySerde.class.getName(), + Serdes.ByteArraySerde.class.getName(), + properties)); + final StreamThread thread = createStreamThread(clientId, config, false); + + thread.setState(StreamThread.State.RUNNING); + thread.setState(StreamThread.State.PARTITIONS_REVOKED); + + final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1); + thread.taskManager().setAssignmentMetadata( + Collections.singletonMap( + new TaskId(0, t1p1.partition()), + assignedPartitions), + Collections.<TaskId, Set<TopicPartition>>emptyMap()); + + final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer; + mockConsumer.assign(Collections.singleton(t1p1)); + mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.runOnce(); + + // processed one record, punctuated after the first record, and hence num.iterations is still 1 + long offset = -1; + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 0, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + thread.runOnce(); + + assertThat(thread.currentNumIterations(), equalTo(1)); + + // processed one more record without punctuation, and bump num.iterations to 2 + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + thread.runOnce(); + + assertThat(thread.currentNumIterations(), equalTo(2)); + + // processed zero records, early exit and iterations stays as 2 + thread.runOnce(); + assertThat(thread.currentNumIterations(), equalTo(2)); + + // system time based punctutation halves to 1 + mockTime.sleep(11L); + + thread.runOnce(); + assertThat(thread.currentNumIterations(), equalTo(1)); + + // processed two records, bumping up iterations to 2 + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 5, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 6, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + thread.runOnce(); + + assertThat(thread.currentNumIterations(), equalTo(2)); + + // stream time based punctutation halves to 1 + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 11, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + thread.runOnce(); + + assertThat(thread.currentNumIterations(), equalTo(1)); + + // processed three records, bumping up iterations to 3 (1 + 2) + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 12, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 13, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 14, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + thread.runOnce(); + + assertThat(thread.currentNumIterations(), equalTo(3)); + + mockProcessor.requestCommit(); + mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 15, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); + thread.runOnce(); + + // user requested commit should not impact on iteration adjustment + assertThat(thread.currentNumIterations(), equalTo(3)); + + // time based commit, halves iterations to 3 / 2 = 1 + mockTime.sleep(90L); + thread.runOnce(); + + assertThat(thread.currentNumIterations(), equalTo(1)); + + } + @SuppressWarnings({"unchecked", "ThrowableNotThrown"}) @Test public void shouldNotCauseExceptionIfNothingCommitted() { @@ -341,9 +437,11 @@ public void shouldNotCauseExceptionIfNothingCommitted() { new LogContext(""), new AtomicInteger() ); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); mockTime.sleep(commitInterval - 10L); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); EasyMock.verify(taskManager); } @@ -376,9 +474,11 @@ public void shouldCommitAfterTheCommitInterval() { new LogContext(""), new AtomicInteger() ); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); mockTime.sleep(commitInterval + 1); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); EasyMock.verify(taskManager); } @@ -457,7 +557,7 @@ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() mockConsumer.updateBeginningOffsets(beginOffsets); thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); - thread.runOnce(-1); + thread.runOnce(); assertEquals(thread.tasks().size(), clientSupplier.producers.size()); assertSame(clientSupplier.consumer, thread.consumer); @@ -646,7 +746,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); assertThat(thread.tasks().size(), equalTo(1)); final MockProducer producer = clientSupplier.producers.get(0); @@ -657,7 +757,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1); - thread.runOnce(-1); + thread.runOnce(); assertThat(producer.history().size(), equalTo(1)); assertFalse(producer.transactionCommitted()); @@ -666,16 +766,16 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh new TestCondition() { @Override public boolean conditionMet() { - return producer.commitCount() == 2; + return producer.commitCount() == 1; } }, "StreamsThread did not commit transaction."); producer.fenceProducer(); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 1, new byte[0], new byte[0])); try { - thread.runOnce(-1); + thread.runOnce(); fail("Should have thrown TaskMigratedException"); } catch (final TaskMigratedException expected) { /* ignore */ } TestUtils.waitForCondition( @@ -687,15 +787,16 @@ public boolean conditionMet() { }, "StreamsThread did not remove fenced zombie task."); - assertThat(producer.commitCount(), equalTo(2L)); + assertThat(producer.commitCount(), equalTo(1L)); } - private StreamThread setupStreamThread() { + @Test + public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTaks() { + final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); + internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); - final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); - thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(null); @@ -713,19 +814,12 @@ private StreamThread setupStreamThread() { mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); assertThat(thread.tasks().size(), equalTo(1)); - return thread; - } - - @Test - public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCommitTransactionWhenSuspendingTaks() { - final StreamThread thread = setupStreamThread(); clientSupplier.producers.get(0).fenceProducer(); thread.rebalanceListener.onPartitionsRevoked(null); - assertTrue(clientSupplier.producers.get(0).transactionInFlight()); assertFalse(clientSupplier.producers.get(0).transactionCommitted()); assertTrue(clientSupplier.producers.get(0).closed()); @@ -733,8 +827,32 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedIn } @Test - public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCloseTransactionWhenSuspendingTaks() { - final StreamThread thread = setupStreamThread(); + public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedInCloseTransactionWhenSuspendingTasks() { + final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); + + internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); + internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); + + thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsRevoked(null); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); + + // assign single partition + assignedPartitions.add(t1p1); + activeTasks.put(task1, Collections.singleton(t1p1)); + + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + + final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer; + mockConsumer.assign(assignedPartitions); + mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + + thread.runOnce(); + + assertThat(thread.tasks().size(), equalTo(1)); clientSupplier.producers.get(0).fenceProducerOnClose(); thread.rebalanceListener.onPartitionsRevoked(null); @@ -789,7 +907,7 @@ public void shouldReturnActiveTaskMetadataWhileRunningState() { mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); final ThreadMetadata threadMetadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); @@ -834,7 +952,7 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() { thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); - thread.runOnce(-1); + thread.runOnce(); final ThreadMetadata threadMetadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); @@ -900,7 +1018,7 @@ public void shouldUpdateStandbyTask() throws IOException { thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); - thread.runOnce(-1); + thread.runOnce(); final StandbyTask standbyTask1 = thread.taskManager().standbyTask(partition1); final StandbyTask standbyTask2 = thread.taskManager().standbyTask(partition2); @@ -967,7 +1085,7 @@ public void close() {} clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); assertEquals(0, punctuatedStreamTime.size()); assertEquals(0, punctuatedWallClockTime.size()); @@ -977,14 +1095,14 @@ public void close() {} clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + i).getBytes())); } - thread.runOnce(-1); + thread.runOnce(); assertEquals(1, punctuatedStreamTime.size()); assertEquals(1, punctuatedWallClockTime.size()); mockTime.sleep(100L); - thread.runOnce(-1); + thread.runOnce(); // we should skip stream time punctuation, only trigger wall-clock time punctuation assertEquals(1, punctuatedStreamTime.size()); @@ -1177,7 +1295,7 @@ public void shouldRecordSkippedMetricForDeserializationException() { mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); @@ -1187,7 +1305,7 @@ public void shouldRecordSkippedMetricForDeserializationException() { long offset = -1; mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes())); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes())); - thread.runOnce(-1); + thread.runOnce(); assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue()); assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue()); @@ -1221,7 +1339,7 @@ public void shouldReportSkippedRecordsForInvalidTimestamps() { mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); @@ -1231,7 +1349,7 @@ public void shouldReportSkippedRecordsForInvalidTimestamps() { long offset = -1; mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); - thread.runOnce(-1); + thread.runOnce(); assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue()); assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue()); @@ -1239,13 +1357,13 @@ public void shouldReportSkippedRecordsForInvalidTimestamps() { mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); - thread.runOnce(-1); + thread.runOnce(); assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue()); assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue()); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); - thread.runOnce(-1); + thread.runOnce(); assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue()); assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 508f2ee6dd7..b0e7fce2bbe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -590,19 +590,19 @@ public void shouldIgnorePurgeDataErrors() { @Test public void shouldMaybeCommitActiveTasks() { - EasyMock.expect(active.maybeCommit()).andReturn(5); + EasyMock.expect(active.maybeCommitPerUserRequested()).andReturn(5); replay(); - assertThat(taskManager.maybeCommitActiveTasks(), equalTo(5)); + assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(5)); verify(active); } @Test public void shouldProcessActiveTasks() { - EasyMock.expect(active.process()).andReturn(10); + EasyMock.expect(active.process(0L)).andReturn(10); replay(); - assertThat(taskManager.process(), equalTo(10)); + assertThat(taskManager.process(0L), equalTo(10)); verify(active); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java index 927be0b4911..c95f4086ef1 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java @@ -40,6 +40,8 @@ private final PunctuationType punctuationType; private final long scheduleInterval; + private boolean commitRequested = false; + public MockProcessor(final PunctuationType punctuationType, final long scheduleInterval) { this.punctuationType = punctuationType; this.scheduleInterval = scheduleInterval; @@ -76,6 +78,10 @@ public void process(final K key, final V value) { processed.add((key == null ? "null" : key) + ":" + (value == null ? "null" : value)); + if (commitRequested) { + context().commit(); + commitRequested = false; + } } public void checkAndClearProcessResult(final String... expected) { @@ -87,6 +93,10 @@ public void checkAndClearProcessResult(final String... expected) { processed.clear(); } + public void requestCommit() { + commitRequested = true; + } + public void checkEmptyAndClearProcessResult() { assertEquals("the number of outputs:", 0, processed.size()); processed.clear(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stream timestamp computation needs some further thoughts > -------------------------------------------------------- > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > Labels: architecture, kip > Fix For: 2.1.0 > > > KIP-353: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization] > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code:java} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code:java} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > [http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor]) > {quote}Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)