[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561177#comment-16561177 ]
ASF GitHub Bot commented on KAFKA-3514: --------------------------------------- guozhangwang closed pull request #5428: KAFKA-3514: Part III, Refactor StreamThread main loop URL: https://github.com/apache/kafka/pull/5428 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index fefeae343e0..6c504e3473f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -499,7 +499,7 @@ public ConsumerConfig(Map<String, Object> props) { super(CONFIG, props); } - ConsumerConfig(Map<?, ?> props, boolean doLog) { + protected ConsumerConfig(Map<?, ?> props, boolean doLog) { super(CONFIG, props, doLog); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index dc00b473027..52c72444091 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.ProduceRequestResult; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; @@ -244,7 +245,12 @@ private void verifyNoTransactionInFlight() { */ @Override public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { - verifyProducerState(); + if (this.closed) { + throw new IllegalStateException("MockProducer is already closed."); + } + if (this.producerFenced) { + throw new KafkaException("MockProducer is fenced.", new ProducerFencedException("Fenced")); + } int partition = 0; if (!this.cluster.partitionsForTopic(record.topic()).isEmpty()) partition = partition(record, this.cluster); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 7a8c710b76b..a6134772571 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -267,18 +268,7 @@ public void shouldThrowOnSendIfProducerGotFenced() { try { producer.send(null); fail("Should have thrown as producer is fenced off"); - } catch (ProducerFencedException e) { } - } - - @Test - public void shouldThrowOnFlushIfProducerGotFenced() { - buildMockProducer(true); - producer.initTransactions(); - producer.fenceProducer(); - try { - producer.flush(); - fail("Should have thrown as producer is fenced off"); - } catch (ProducerFencedException e) { } + } catch (KafkaException e) { } } @Test diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 94e4c71d9c2..8eb024cc670 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -51,9 +51,11 @@ final boolean eosEnabled; final Logger log; final LogContext logContext; + final StateDirectory stateDirectory; + boolean taskInitialized; boolean taskClosed; - final StateDirectory stateDirectory; + boolean commitNeeded; InternalProcessorContext processorContext; @@ -267,6 +269,10 @@ public boolean isClosed() { return taskClosed; } + public boolean commitNeeded() { + return commitNeeded; + } + public boolean hasStateStores() { return !topology.stateStores().isEmpty(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index f98e6356a22..844b30b59bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -23,14 +23,16 @@ import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.stream.Collectors; class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements RestoringTasks { private final Logger log; private final TaskAction<StreamTask> maybeCommitAction; - private int committed = 0; + private Map<TaskId, StreamTask> processable = Collections.emptyMap(); AssignedStreamsTasks(final LogContext logContext) { super(logContext, "stream task"); @@ -45,7 +47,7 @@ public String name() { @Override public void apply(final StreamTask task) { - if (task.commitNeeded()) { + if (task.commitRequested() && task.commitNeeded()) { committed++; task.commit(); log.debug("Committed active task {} per user request in", task.id()); @@ -82,14 +84,23 @@ int maybeCommit() { return recordsToDelete; } + /** + * Update the list of processable tasks + */ + void update() { + processable = running.entrySet().stream() + .filter(entry -> entry.getValue().isProcessable()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + /** * @throws TaskMigratedException if the task producer got fenced (EOS only) */ int process() { int processed = 0; - final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator(); - while (it.hasNext()) { - final StreamTask task = it.next().getValue(); + + for (Iterator<StreamTask> iter = processable.values().iterator(); iter.hasNext(); ) { + StreamTask task = iter.next(); try { if (task.process()) { processed++; @@ -101,13 +112,18 @@ int process() { if (fatalException != null) { throw fatalException; } - it.remove(); + running.remove(task.id()); throw e; } catch (final RuntimeException e) { log.error("Failed to process stream task {} due to the following error:", task.id(), e); throw e; } + + if (!task.allSourcePartitionsBuffered()) { + iter.remove(); + } } + return processed; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 079d405cb50..cb4bde2e35a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -45,6 +45,9 @@ private final Map<TaskId, T> restoring = new HashMap<>(); private final Set<TopicPartition> restoredPartitions = new HashSet<>(); private final Set<TaskId> previousActiveTasks = new HashSet<>(); + + protected int committed = 0; + // IQ may access this map. final Map<TaskId, T> running = new ConcurrentHashMap<>(); private final Map<TopicPartition, T> runningByPartition = new HashMap<>(); @@ -64,7 +67,10 @@ public String name() { @Override public void apply(final T task) { - task.commit(); + if (task.commitNeeded()) { + committed++; + task.commit(); + } } }; } @@ -349,8 +355,9 @@ void clear() { * or if the task producer got fenced (EOS) */ int commit() { + committed = 0; applyToRunningTasks(commitAction); - return running.size(); + return committed; } void applyToRunningTasks(final TaskAction<T> action) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 34252bf2b4c..80608213941 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -32,10 +32,13 @@ public class PartitionGroup { private final Map<TopicPartition, RecordQueue> partitionQueues; - private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime; + private final int numQueues; + private long streamTime; private int totalBuffered; + private boolean allBuffered; + public static class RecordInfo { RecordQueue queue; @@ -53,12 +56,13 @@ RecordQueue queue() { } } - PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) { nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); this.partitionQueues = partitionQueues; totalBuffered = 0; + allBuffered = false; streamTime = RecordQueue.NOT_KNOWN; + numQueues = this.partitionQueues.keySet().size(); } /** @@ -81,15 +85,14 @@ StampedRecord nextRecord(final RecordInfo info) { if (!queue.isEmpty()) { nonEmptyQueuesByTime.offer(queue); + } else { + // if a certain queue has been drained, reset the flag + allBuffered = false; } - // Since this was previously a queue with min timestamp, - // streamTime could only advance if this queue's time did. - if (queue.timestamp() > streamTime) { - computeStreamTime(); - } + // always update the stream time to the record's timestamp yet to be processed if it is larger + streamTime = record.timestamp > streamTime ? record.timestamp : streamTime; } - } return record; @@ -106,17 +109,22 @@ int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord< final RecordQueue recordQueue = partitionQueues.get(partition); final int oldSize = recordQueue.size(); - final long oldTimestamp = recordQueue.timestamp(); final int newSize = recordQueue.addRawRecords(rawRecords); // add this record queue to be considered for processing in the future if it was empty before if (oldSize == 0 && newSize > 0) { nonEmptyQueuesByTime.offer(recordQueue); - } - // Adding to this queue could only advance streamTime if it was previously the queue with min timestamp (= streamTime) - if (oldTimestamp <= streamTime && recordQueue.timestamp() > streamTime) { - computeStreamTime(); + // if all partitions now are non-empty, set the flag and compute the stream time + if (nonEmptyQueuesByTime.size() == numQueues) { + allBuffered = true; + + // since we may enforce processing even if some queue is empty, it is possible that after some + // raw data has been added to that queue the new partition's timestamp is even smaller than the current + // stream time, in this case we should not update. + final long newTimestamp = nonEmptyQueuesByTime.peek().timestamp(); + streamTime = newTimestamp > streamTime ? newTimestamp : streamTime; + } } totalBuffered += newSize - oldSize; @@ -136,18 +144,6 @@ public long timestamp() { return streamTime; } - private void computeStreamTime() { - // we should always return the smallest timestamp of all partitions - // to avoid group partition time goes backward - long timestamp = Long.MAX_VALUE; - for (final RecordQueue queue : partitionQueues.values()) { - if (queue.timestamp() < timestamp) { - timestamp = queue.timestamp(); - } - } - this.streamTime = timestamp; - } - /** * @throws IllegalStateException if the record's partition does not belong to this partition group */ @@ -165,6 +161,10 @@ int numBuffered() { return totalBuffered; } + boolean allPartitionsBuffered() { + return allBuffered; + } + public void close() { partitionQueues.clear(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index d753648eede..73a242e4cd8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -207,18 +207,41 @@ public void onCompletion(final RecordMetadata metadata, "You can increase producer parameter `max.block.ms` to increase this timeout.", topic); throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic)); } catch (final Exception uncaughtException) { - throw new StreamsException( - String.format( - EXCEPTION_MESSAGE, - logPrefix, - "an error caught", - key, - value, - timestamp, - topic, - uncaughtException.toString() - ), - uncaughtException); + if (uncaughtException instanceof KafkaException) { + final KafkaException kafkaException = (KafkaException) uncaughtException; + + if (kafkaException.getCause() instanceof ProducerFencedException) { + // producer.send() call may throw a KafkaException which wraps a FencedException, + // in this case we should throw its wrapped inner cause so that it can be captured and re-wrapped as TaskMigrationException + throw (ProducerFencedException) kafkaException.getCause(); + } else { + throw new StreamsException( + String.format( + EXCEPTION_MESSAGE, + logPrefix, + "an error caught", + key, + value, + timestamp, + topic, + uncaughtException.toString() + ), + uncaughtException); + } + } else { + throw new StreamsException( + String.format( + EXCEPTION_MESSAGE, + logPrefix, + "an error caught", + key, + value, + timestamp, + topic, + uncaughtException.toString() + ), + uncaughtException); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 72cc6295a2d..3ac64146187 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -103,6 +103,8 @@ public void commit() { flushAndCheckpointState(); // reinitialize offset limits updateOffsetLimits(); + + commitNeeded = false; } /** @@ -185,6 +187,11 @@ public void closeSuspended(final boolean clean, } stateMgr.updateStandbyStates(partition, restoreRecords, lastOffset); + + if (!restoreRecords.isEmpty()) { + commitNeeded = true; + } + return remainingRecords; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7f121fe0df4..648c997658e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -59,6 +59,8 @@ private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null); + private static final int WAIT_ON_PARTIAL_INPUT = 5; + private final PartitionGroup partitionGroup; private final PartitionGroup.RecordInfo recordInfo; private final PunctuationQueue streamTimePunctuationQueue; @@ -69,15 +71,17 @@ private final Producer<byte[], byte[]> producer; private final int maxBufferedSize; + private boolean commitRequested = false; - private boolean commitOffsetNeeded = false; private boolean transactionInFlight = false; + private int waits = WAIT_ON_PARTIAL_INPUT; private final Time time; private final TaskMetrics taskMetrics; protected static final class TaskMetrics { final StreamsMetricsImpl metrics; final Sensor taskCommitTimeSensor; + final Sensor taskEnforcedProcessSensor; private final String taskName; @@ -108,7 +112,7 @@ // add the operation metrics with additional tags final Map<String, String> tagMap = metrics.tagMap("task-id", taskName); - taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName, Sensor.RecordingLevel.DEBUG, parent); + taskCommitTimeSensor = metrics.taskLevelSensor(taskName, "commit", Sensor.RecordingLevel.DEBUG, parent); taskCommitTimeSensor.add( new MetricName("commit-latency-avg", group, "The average latency of commit operation.", tagMap), new Avg() @@ -125,6 +129,18 @@ new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap), new Count() ); + + // add the metrics for enforced processing + taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, "enforced-process", Sensor.RecordingLevel.DEBUG, parent); + taskEnforcedProcessSensor.add( + new MetricName("enforced-process-rate", group, "The average number of occurrence of enforced-process per second.", tagMap), + new Rate(TimeUnit.SECONDS, new Count()) + ); + taskEnforcedProcessSensor.add( + new MetricName("enforced-process-total", group, "The total number of occurrence of enforced-process operations.", tagMap), + new Count() + ); + } void removeAllSensors() { @@ -263,6 +279,25 @@ public void resume() { log.debug("Resuming"); } + /** + * An active task is processable if its buffer contains data for all of its input source topic partitions + */ + public boolean isProcessable() { + if (partitionGroup.allPartitionsBuffered()) { + return true; + } else if (partitionGroup.numBuffered() > 0 && --waits < 0) { + taskMetrics.taskEnforcedProcessSensor.record(); + waits = WAIT_ON_PARTIAL_INPUT; + return true; + } else { + return false; + } + } + + public boolean allSourcePartitionsBuffered() { + return partitionGroup.allPartitionsBuffered(); + } + /** * Process one record. * @@ -293,7 +328,7 @@ public boolean process() { // update the consumed offset map after processing is done consumedOffsets.put(partition, record.offset()); - commitOffsetNeeded = true; + commitNeeded = true; // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition @@ -385,12 +420,33 @@ void commit(final boolean startNewTransaction) { stateMgr.checkpoint(activeTaskCheckpointableOffsets()); } - commitOffsets(startNewTransaction); + final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); + for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { + final TopicPartition partition = entry.getKey(); + final long offset = entry.getValue() + 1; + consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); + stateMgr.putOffsetLimit(partition, offset); + } - commitRequested = false; + try { + if (eosEnabled) { + producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); + producer.commitTransaction(); + transactionInFlight = false; + if (startNewTransaction) { + producer.beginTransaction(); + transactionInFlight = true; + } + } else { + consumer.commitSync(consumedOffsetsAndMetadata); + } + } catch (final CommitFailedException | ProducerFencedException fatal) { + throw new TaskMigratedException(this, fatal); + } - taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs); - } + commitNeeded = false; + commitRequested = false; + taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs); } @Override protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() { @@ -413,43 +469,6 @@ protected void flushState() { } } - /** - * @throws TaskMigratedException if committing offsets failed (non-EOS) - * or if the task producer got fenced (EOS) - */ - private void commitOffsets(final boolean startNewTransaction) { - try { - if (commitOffsetNeeded) { - log.trace("Committing offsets"); - final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); - for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { - final TopicPartition partition = entry.getKey(); - final long offset = entry.getValue() + 1; - consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); - stateMgr.putOffsetLimit(partition, offset); - } - - if (eosEnabled) { - producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); - } else { - consumer.commitSync(consumedOffsetsAndMetadata); - } - commitOffsetNeeded = false; - } - - if (eosEnabled) { - producer.commitTransaction(); - transactionInFlight = false; - if (startNewTransaction) { - producer.beginTransaction(); - transactionInFlight = true; - } - } - } catch (final CommitFailedException | ProducerFencedException fatal) { - throw new TaskMigratedException(this, fatal); - } - } - Map<TopicPartition, Long> purgableOffsets() { final Map<TopicPartition, Long> purgableConsumedOffsets = new HashMap<>(); for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { @@ -719,7 +738,14 @@ public boolean maybePunctuateStreamTime() { if (timestamp == RecordQueue.NOT_KNOWN) { return false; } else { - return streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); + final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); + + if (punctuated) { + commitNeeded = true; + return true; + } else { + return false; + } } } @@ -733,7 +759,14 @@ public boolean maybePunctuateStreamTime() { public boolean maybePunctuateSystemTime() { final long timestamp = time.milliseconds(); - return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this); + final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this); + + if (punctuated) { + commitNeeded = true; + return true; + } else { + return false; + } } /** @@ -746,7 +779,7 @@ void needCommit() { /** * Whether or not a request has been made to commit the current state */ - boolean commitNeeded() { + boolean commitRequested() { return commitRequested; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 31de839b1a1..8f89ec04734 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Total; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; @@ -69,7 +70,6 @@ public class StreamThread extends Thread { - private final static int UNLIMITED_RECORDS = -1; private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1); /** @@ -561,18 +561,21 @@ StandbyTask createTask(final Consumer<byte[], byte[]> consumer, } private final Time time; - private final Duration pollTime; - private final long commitTimeMs; - private final Object stateLock; private final Logger log; private final String logPrefix; + private final Object stateLock; + private final Duration pollTime; + private final long commitTimeMs; + private final int maxPollTimeMs; + private final String originalReset; private final TaskManager taskManager; private final StreamsMetricsThreadImpl streamsMetrics; private final AtomicInteger assignmentErrorCode; + private long now; + private long lastPollMs; private long lastCommitMs; - private long timerStartedMs; - private final String originalReset; + private int numIterations; private Throwable rebalanceException = null; private boolean processStandbyRecords = false; private volatile State state = State.CREATED; @@ -718,11 +721,21 @@ public StreamThread(final Time time, this.assignmentErrorCode = assignmentErrorCode; this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); + this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId")) + .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); + this.numIterations = 1; + updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); } + private static final class InternalConsumerConfig extends ConsumerConfig { + private InternalConsumerConfig(final Map<String, Object> props) { + super(ConsumerConfig.addDeserializerToConfig(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()), false); + } + } + /** * Execute the stream processors * @@ -764,12 +777,11 @@ private void setRebalanceException(final Throwable rebalanceException) { * @throws StreamsException if the store's change log does not contain the partition */ private void runLoop() { - long recordsProcessedBeforeCommit = UNLIMITED_RECORDS; consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); while (isRunning()) { try { - recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); + runOnce(); if (assignmentErrorCode.get() == StreamsPartitionAssignor.Error.VERSION_PROBING.code()) { log.info("Version probing detected. Triggering new rebalance."); enforceRebalance(); @@ -798,12 +810,10 @@ private void enforceRebalance() { * or if the task producer got fenced (EOS) */ // Visible for testing - long runOnce(final long recordsProcessedBeforeCommit) { - long processedBeforeCommit = recordsProcessedBeforeCommit; - + void runOnce() { final ConsumerRecords<byte[], byte[]> records; - timerStartedMs = time.milliseconds(); + now = time.milliseconds(); if (state == State.PARTITIONS_ASSIGNED) { // try to fetch some records with zero poll millis @@ -831,25 +841,45 @@ long runOnce(final long recordsProcessedBeforeCommit) { } } - if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks()) { - streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); + if (records != null && !records.isEmpty()) { + streamsMetrics.pollTimeSensor.record(computeLatency(), now); addRecordsToTasks(records); - final long totalProcessed = processAndMaybeCommit(recordsProcessedBeforeCommit); - if (totalProcessed > 0) { - final long processLatency = computeLatency(); - streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed, timerStartedMs); - processedBeforeCommit = adjustRecordsProcessedBeforeCommit( - recordsProcessedBeforeCommit, - totalProcessed, - processLatency, - commitTimeMs); - } } - punctuate(); - maybeCommit(timerStartedMs); - maybeUpdateStandbyTasks(timerStartedMs); - return processedBeforeCommit; + if (state == State.RUNNING) { + taskManager.updateProcessableTasks(); + + /* + * Within an iteration, after N (N initialized as 1 upon start up) round of processing one-record-each on the applicable tasks, check the current time: + * 1. If it is time to commit, do it; + * 2. If it is time to punctuate, do it; + * 3. If elapsed time is close to consumer's max.poll.interval.ms, end the current iteration immediately. + * 4. If none of the the above happens, increment N. + * 5. If one of the above happens, half the value of N. + */ + long totalProcessed; + long timeSinceLastPoll; + + do { + totalProcessed = processAndMaybeCommit(); + timeSinceLastPoll = Math.max(now - lastPollMs, 0); + + if (timeSinceLastPoll / 2 >= maxPollTimeMs) { + break; + } else if (maybePunctuate() || maybeCommit()) { + numIterations = numIterations > 1 ? numIterations / 2 : numIterations; + } else { + numIterations++; + } + } while (totalProcessed > 0 && timeSinceLastPoll < maxPollTimeMs); + + // even if there is not data to process in this iteration, still need to check if commit / punctuate is needed + maybePunctuate(); + + maybeCommit(); + + maybeUpdateStandbyTasks(); + } } /** @@ -862,6 +892,8 @@ long runOnce(final long recordsProcessedBeforeCommit) { private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) { ConsumerRecords<byte[], byte[]> records = null; + this.lastPollMs = now; + try { records = consumer.poll(pollTime); } catch (final InvalidOffsetException e) { @@ -946,82 +978,48 @@ private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) { * Schedule the records processing by selecting which record is processed next. Commits may * happen as records are processed. * - * @param recordsProcessedBeforeCommit number of records to be processed before commit is called. - * if UNLIMITED_RECORDS, then commit is never called * @return Number of records processed since last commit. * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) */ - private long processAndMaybeCommit(final long recordsProcessedBeforeCommit) { - - long processed; - long totalProcessedSinceLastMaybeCommit = 0; - // Round-robin scheduling by taking one record from each task repeatedly - // until no task has any records left - do { - processed = taskManager.process(); + private long processAndMaybeCommit() { + long totalProcessed = 0; + + for (int i = 0; i < numIterations; i++) { + int processed = taskManager.process(); + if (processed > 0) { - streamsMetrics.processTimeSensor.record(computeLatency() / (double) processed, timerStartedMs); - } - totalProcessedSinceLastMaybeCommit += processed; + totalProcessed += processed; + streamsMetrics.processTimeSensor.record(computeLatency() / (double) processed, now); - punctuate(); + taskManager.updateProcessableTasks(); - if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS && - totalProcessedSinceLastMaybeCommit >= recordsProcessedBeforeCommit) { - totalProcessedSinceLastMaybeCommit = 0; - maybeCommit(timerStartedMs); - } - // commit any tasks that have requested a commit - final int committed = taskManager.maybeCommitActiveTasks(); - if (committed > 0) { - streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, timerStartedMs); + // commit any tasks that have requested a commit + final int committed = taskManager.maybeCommitActiveTasks(); + if (committed > 0) { + streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, now); + } + } else { + // if there is no records to be processed, exit immediately + break; } - } while (processed != 0); + } - return totalProcessedSinceLastMaybeCommit; + return totalProcessed; } /** * @throws TaskMigratedException if the task producer got fenced (EOS only) */ - private void punctuate() { + private boolean maybePunctuate() { final int punctuated = taskManager.punctuate(); if (punctuated > 0) { - streamsMetrics.punctuateTimeSensor.record(computeLatency() / (double) punctuated, timerStartedMs); - } - } + streamsMetrics.punctuateTimeSensor.record(computeLatency() / (double) punctuated, now); - /** - * Adjust the number of records that should be processed by scheduler. This avoids - * scenarios where the processing time is higher than the commit time. - * - * @param prevRecordsProcessedBeforeCommit Previous number of records processed by scheduler. - * @param totalProcessed Total number of records processed in this last round. - * @param processLatency Total processing latency in ms processed in this last round. - * @param commitTime Desired commit time in ms. - * @return An adjusted number of records to be processed in the next round. - */ - private long adjustRecordsProcessedBeforeCommit(final long prevRecordsProcessedBeforeCommit, final long totalProcessed, - final long processLatency, final long commitTime) { - long recordsProcessedBeforeCommit = UNLIMITED_RECORDS; - // check if process latency larger than commit latency - // note that once we set recordsProcessedBeforeCommit, it will never be UNLIMITED_RECORDS again, so - // we will never process all records again. This might be an issue if the initial measurement - // was off due to a slow start. - if (processLatency > 0 && processLatency > commitTime) { - // push down - recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency); - log.debug("processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}", - processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); - } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency > 0) { - // push up - recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency); - log.debug("processing latency {} < commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}", - processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit); + return true; + } else { + return false; } - - return recordsProcessedBeforeCommit; } /** @@ -1030,32 +1028,44 @@ private long adjustRecordsProcessedBeforeCommit(final long prevRecordsProcessedB * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) */ - void maybeCommit(final long now) { + boolean maybeCommit() { + int committed = taskManager.maybeCommitActiveTasks(); + if (committed > 0) { + streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, now); + } + if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { if (log.isTraceEnabled()) { log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", - taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs); + taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs); } - final int committed = taskManager.commitAll(); + committed = taskManager.commitAll(); if (committed > 0) { - streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, timerStartedMs); + final long previous = now; + + streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, now); // try to purge the committed records for repartition topics if possible taskManager.maybePurgeCommitedRecords(); - } - if (log.isDebugEnabled()) { - log.debug("Committed all active tasks {} and standby tasks {} in {}ms", - taskManager.activeTaskIds(), taskManager.standbyTaskIds(), timerStartedMs - now); + + if (log.isDebugEnabled()) { + log.debug("Committed all active tasks {} and standby tasks {} in {}ms", + taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - previous); + } } lastCommitMs = now; processStandbyRecords = true; + + return committed > 0; + } else { + return false; } } - private void maybeUpdateStandbyTasks(final long now) { + private void maybeUpdateStandbyTasks() { if (state == State.RUNNING && taskManager.hasStandbyRunningTasks()) { if (processStandbyRecords) { if (!standbyRecords.isEmpty()) { @@ -1084,7 +1094,9 @@ private void maybeUpdateStandbyTasks(final long now) { standbyRecords = remainingStandbyRecords; - log.debug("Updated standby tasks {} in {}ms", taskManager.standbyTaskIds(), time.milliseconds() - now); + if (log.isDebugEnabled()) { + log.debug("Updated standby tasks {} in {}ms", taskManager.standbyTaskIds(), time.milliseconds() - now); + } } processStandbyRecords = false; } @@ -1144,10 +1156,10 @@ private void maybeUpdateStandbyTasks(final long now) { * @return latency */ private long computeLatency() { - final long previousTimeMs = timerStartedMs; - timerStartedMs = time.milliseconds(); + final long previous = now; + now = time.milliseconds(); - return Math.max(timerStartedMs - previousTimeMs, 0); + return Math.max(now - previous, 0); } /** @@ -1246,6 +1258,10 @@ public String toString(final String indent) { } // the following are for testing only + void setNow(final long now) { + this.now = now; + } + TaskManager taskManager() { return taskManager; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 5f221e3dc02..59bcd7869f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -34,6 +34,8 @@ */ boolean initializeStateStores(); + boolean commitNeeded(); + void initializeTopology(); void commit(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 9da27020c5f..9571ae190b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -278,6 +278,10 @@ void shutdown(final boolean clean) { } } + void updateProcessableTasks() { + active.update(); + } + AdminClient getAdminClient() { return adminClient; } @@ -332,10 +336,6 @@ boolean updateNewAndRestoringTasks() { return false; } - boolean hasActiveRunningTasks() { - return active.hasRunningTasks(); - } - boolean hasStandbyRunningTasks() { return standby.hasRunningTasks(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 662ded553ad..e99a5b3bb23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -88,13 +88,13 @@ public final void removeAllThreadLevelSensors() { } public final Sensor taskLevelSensor(final String taskName, - final String sensorName, - final Sensor.RecordingLevel recordingLevel, - final Sensor... parents) { + final String sensorName, + final Sensor.RecordingLevel recordingLevel, + final Sensor... parents) { final String key = threadName + "." + taskName; synchronized (taskLevelSensors) { if (!taskLevelSensors.containsKey(key)) { - taskLevelSensors.put(key, new LinkedList<String>()); + taskLevelSensors.put(key, new LinkedList<>()); } final String fullSensorName = key + "." + sensorName; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 77b9c1e8560..12b4cf30240 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -370,7 +370,7 @@ private NamedCacheMetrics(final StreamsMetricsImpl metrics, final String cacheNa "record-cache-id", "all", "task-id", taskName ); - final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor("hitRatio", taskName, Sensor.RecordingLevel.DEBUG); + final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor(taskName, "hitRatio", Sensor.RecordingLevel.DEBUG); taskLevelHitRatioSensor.add( new MetricName("hitRatio-avg", group, "The average cache hit ratio.", allMetricTags), new Avg() diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 5681d7c045f..7f6db04ec8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -24,14 +24,13 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -40,9 +39,6 @@ import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; @@ -54,14 +50,9 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Properties; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - /** * Similar to KStreamAggregationIntegrationTest but with dedupping enabled * by virtue of having a large commit interval @@ -93,11 +84,9 @@ public void before() throws InterruptedException { builder = new StreamsBuilder(); createTopics(); streamsConfiguration = new Properties(); - String applicationId = "kgrouped-stream-test-" + - testNo; + String applicationId = "kgrouped-stream-test-" + testNo; streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); @@ -111,12 +100,7 @@ public void before() throws InterruptedException { mapper, Serialized.with(Serdes.String(), Serdes.String())); - reducer = new Reducer<String>() { - @Override - public String apply(String value1, String value2) { - return value1 + ":" + value2; - } - }; + reducer = (value1, value2) -> value1 + ":" + value2; } @After @@ -132,7 +116,7 @@ public void whenShuttingDown() throws IOException { public void shouldReduce() throws Exception { produceMessages(System.currentTimeMillis()); groupedStream - .reduce(reducer, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce-by-key")) + .reduce(reducer, Materialized.as("reduce-by-key")) .toStream() .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); @@ -140,34 +124,15 @@ public void shouldReduce() throws Exception { produceMessages(System.currentTimeMillis()); - List<KeyValue<String, String>> results = receiveMessages( - new StringDeserializer(), - new StringDeserializer(), - 5); - - Collections.sort(results, new Comparator<KeyValue<String, String>>() { - @Override - public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) { - return KStreamAggregationDedupIntegrationTest.compare(o1, o2); - } - }); - - assertThat(results, is(Arrays.asList( - KeyValue.pair("A", "A:A"), - KeyValue.pair("B", "B:B"), - KeyValue.pair("C", "C:C"), - KeyValue.pair("D", "D:D"), - KeyValue.pair("E", "E:E")))); - } - - @SuppressWarnings("unchecked") - private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1, - final KeyValue<K, V> o2) { - final int keyComparison = o1.key.compareTo(o2.key); - if (keyComparison == 0) { - return o1.value.compareTo(o2.value); - } - return keyComparison; + validateReceivedMessages( + new StringDeserializer(), + new StringDeserializer(), + Arrays.asList( + KeyValue.pair("A", "A:A"), + KeyValue.pair("B", "B:B"), + KeyValue.pair("C", "C:C"), + KeyValue.pair("D", "D:D"), + KeyValue.pair("E", "E:E"))); } @Test @@ -180,50 +145,31 @@ public void shouldReduceWindowed() throws Exception { groupedStream .windowedBy(TimeWindows.of(500L)) - .reduce(reducer, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("reduce-time-windows")) - .toStream(new KeyValueMapper<Windowed<String>, String, String>() { - @Override - public String apply(Windowed<String> windowedKey, String value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }) + .reduce(reducer, Materialized.as("reduce-time-windows")) + .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()) .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); startStreams(); - List<KeyValue<String, String>> windowedOutput = receiveMessages( - new StringDeserializer(), - new StringDeserializer(), - 10); - - Comparator<KeyValue<String, String>> - comparator = - new Comparator<KeyValue<String, String>>() { - @Override - public int compare(final KeyValue<String, String> o1, - final KeyValue<String, String> o2) { - return KStreamAggregationDedupIntegrationTest.compare(o1, o2); - } - }; - - Collections.sort(windowedOutput, comparator); long firstBatchWindow = firstBatchTimestamp / 500 * 500; long secondBatchWindow = secondBatchTimestamp / 500 * 500; - assertThat(windowedOutput, is( - Arrays.asList( - new KeyValue<>("A@" + firstBatchWindow, "A"), - new KeyValue<>("A@" + secondBatchWindow, "A:A"), - new KeyValue<>("B@" + firstBatchWindow, "B"), - new KeyValue<>("B@" + secondBatchWindow, "B:B"), - new KeyValue<>("C@" + firstBatchWindow, "C"), - new KeyValue<>("C@" + secondBatchWindow, "C:C"), - new KeyValue<>("D@" + firstBatchWindow, "D"), - new KeyValue<>("D@" + secondBatchWindow, "D:D"), - new KeyValue<>("E@" + firstBatchWindow, "E"), - new KeyValue<>("E@" + secondBatchWindow, "E:E") - ) - )); + validateReceivedMessages( + new StringDeserializer(), + new StringDeserializer(), + Arrays.asList( + new KeyValue<>("A@" + firstBatchWindow, "A"), + new KeyValue<>("A@" + secondBatchWindow, "A:A"), + new KeyValue<>("B@" + firstBatchWindow, "B"), + new KeyValue<>("B@" + secondBatchWindow, "B:B"), + new KeyValue<>("C@" + firstBatchWindow, "C"), + new KeyValue<>("C@" + secondBatchWindow, "C:C"), + new KeyValue<>("D@" + firstBatchWindow, "D"), + new KeyValue<>("D@" + secondBatchWindow, "D:D"), + new KeyValue<>("E@" + firstBatchWindow, "E"), + new KeyValue<>("E@" + secondBatchWindow, "E:E") + ) + ); } @Test @@ -234,36 +180,25 @@ public void shouldGroupByKey() throws Exception { stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String())) .windowedBy(TimeWindows.of(500L)) - .count(Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as("count-windows")) - .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() { - @Override - public String apply(final Windowed<Integer> windowedKey, final Long value) { - return windowedKey.key() + "@" + windowedKey.window().start(); - } - }).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); + .count(Materialized.as("count-windows")) + .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()) + .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); startStreams(); - final List<KeyValue<String, Long>> results = receiveMessages( - new StringDeserializer(), - new LongDeserializer(), - 5); - Collections.sort(results, new Comparator<KeyValue<String, Long>>() { - @Override - public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { - return KStreamAggregationDedupIntegrationTest.compare(o1, o2); - } - }); - final long window = timestamp / 500 * 500; - assertThat(results, is(Arrays.asList( - KeyValue.pair("1@" + window, 2L), - KeyValue.pair("2@" + window, 2L), - KeyValue.pair("3@" + window, 2L), - KeyValue.pair("4@" + window, 2L), - KeyValue.pair("5@" + window, 2L) - ))); + validateReceivedMessages( + new StringDeserializer(), + new LongDeserializer(), + Arrays.asList( + KeyValue.pair("1@" + window, 2L), + KeyValue.pair("2@" + window, 2L), + KeyValue.pair("3@" + window, 2L), + KeyValue.pair("4@" + window, 2L), + KeyValue.pair("5@" + window, 2L) + ) + ); } @@ -298,11 +233,9 @@ private void startStreams() { } - private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> - keyDeserializer, - final Deserializer<V> - valueDeserializer, - final int numMessages) + private <K, V> void validateReceivedMessages(final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer, + final List<KeyValue<K, V>> expectedRecords) throws InterruptedException { final Properties consumerProperties = new Properties(); consumerProperties @@ -314,11 +247,11 @@ private void startStreams() { keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); - return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, - outputTopic, - numMessages, - 60 * 1000); + consumerProperties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE)); + IntegrationTestUtils.waitUntilExactKeyValueRecordsReceived(consumerProperties, + outputTopic, + expectedRecords); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index ff791be1316..b68695e1247 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -524,20 +524,24 @@ public boolean test(final String key, final Long value) { myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.<String, Long>keyValueStore()); for (final KeyValue<String, Long> expectedEntry : expectedBatch1) { - assertEquals(myFilterStore.get(expectedEntry.key), expectedEntry.value); + TestUtils.waitForCondition(() -> expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)), + "Cannot get expected result"); } for (final KeyValue<String, Long> batchEntry : batch1) { if (!expectedBatch1.contains(batchEntry)) { - assertNull(myFilterStore.get(batchEntry.key)); + TestUtils.waitForCondition(() -> myFilterStore.get(batchEntry.key) == null, + "Cannot get null result"); } } for (final KeyValue<String, Long> expectedEntry : expectedBatch1) { - assertNull(myFilterNotStore.get(expectedEntry.key)); + TestUtils.waitForCondition(() -> myFilterNotStore.get(expectedEntry.key) == null, + "Cannot get null result"); } for (final KeyValue<String, Long> batchEntry : batch1) { if (!expectedBatch1.contains(batchEntry)) { - assertEquals(myFilterNotStore.get(batchEntry.key), batchEntry.value); + TestUtils.waitForCondition(() -> batchEntry.value.equals(myFilterNotStore.get(batchEntry.key)), + "Cannot get expected result"); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 749d74887e5..0f64b8df6fc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -350,6 +350,33 @@ public static void waitForCompletion(final KafkaStreams streams, return accumData; } + public static <K, V> List<KeyValue<K, V>> waitUntilExactKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final List<KeyValue<K, V>> expectedRecords) throws InterruptedException { + return waitUntilExactKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT); + } + + public static <K, V> List<KeyValue<K, V>> waitUntilExactKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final List<KeyValue<K, V>> expectedRecords, + final long waitTime) throws InterruptedException { + final List<KeyValue<K, V>> accumData = new ArrayList<>(); + try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) { + final TestCondition valuesRead = new TestCondition() { + @Override + public boolean conditionMet() { + final List<KeyValue<K, V>> readData = + readKeyValues(topic, consumer, waitTime, expectedRecords.size()); + accumData.addAll(readData); + return accumData.containsAll(expectedRecords); + } + }; + final String conditionDetails = "Did not receive all " + expectedRecords + " records from topic " + topic; + TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); + } + return accumData; + } + public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index 8a8d6255c46..3787ffec205 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -253,6 +253,7 @@ private void mockTaskInitialization() { @Test public void shouldCommitRunningTasks() { mockTaskInitialization(); + EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall(); EasyMock.replay(t1); @@ -266,6 +267,7 @@ public void shouldCommitRunningTasks() { @Test public void shouldCloseTaskOnCommitIfTaskMigratedException() { mockTaskInitialization(); + EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall().andThrow(new TaskMigratedException()); t1.close(false, true); @@ -285,6 +287,7 @@ public void shouldCloseTaskOnCommitIfTaskMigratedException() { @Test public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() { mockTaskInitialization(); + EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall().andThrow(new RuntimeException("")); EasyMock.replay(t1); @@ -303,6 +306,7 @@ public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() { @Test public void shouldCommitRunningTasksIfNeeded() { mockTaskInitialization(); + EasyMock.expect(t1.commitRequested()).andReturn(true); EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall(); @@ -317,6 +321,7 @@ public void shouldCommitRunningTasksIfNeeded() { @Test public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() { mockTaskInitialization(); + EasyMock.expect(t1.commitRequested()).andReturn(true); EasyMock.expect(t1.commitNeeded()).andReturn(true); t1.commit(); EasyMock.expectLastCall().andThrow(new TaskMigratedException()); @@ -337,12 +342,14 @@ public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() { @Test public void shouldCloseTaskOnProcessesIfTaskMigratedException() { mockTaskInitialization(); + EasyMock.expect(t1.isProcessable()).andReturn(true); t1.process(); EasyMock.expectLastCall().andThrow(new TaskMigratedException()); t1.close(false, true); EasyMock.expectLastCall(); EasyMock.replay(t1); addAndInitTask(); + assignedTasks.update(); try { assignedTasks.process(); @@ -353,6 +360,35 @@ public void shouldCloseTaskOnProcessesIfTaskMigratedException() { EasyMock.verify(t1); } + @Test + public void shouldNotProcessUnprocessableTasks() { + mockTaskInitialization(); + EasyMock.expect(t1.isProcessable()).andReturn(false); + EasyMock.replay(t1); + addAndInitTask(); + assignedTasks.update(); + + assertThat(assignedTasks.process(), equalTo(0)); + + EasyMock.verify(t1); + } + + @Test + public void shouldAlwaysProcessProcessableTasks() { + mockTaskInitialization(); + EasyMock.expect(t1.isProcessable()).andReturn(true); + EasyMock.expect(t1.process()).andReturn(true).once(); + EasyMock.expect(t1.allSourcePartitionsBuffered()).andReturn(true); + EasyMock.replay(t1); + + addAndInitTask(); + assignedTasks.update(); + + assertThat(assignedTasks.process(), equalTo(1)); + + EasyMock.verify(t1); + } + @Test public void shouldPunctuateRunningTasks() { mockTaskInitialization(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index b3123e46343..f7765ca0780 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -108,7 +108,7 @@ public void testTimeTracking() { assertEquals(5, group.numBuffered()); assertEquals(2, group.numBuffered(partition1)); assertEquals(3, group.numBuffered(partition2)); - assertEquals(2L, group.timestamp()); + assertEquals(1L, group.timestamp()); // get one record, now the time should be advanced record = group.nextRecord(info); @@ -120,7 +120,7 @@ public void testTimeTracking() { assertEquals(4, group.numBuffered()); assertEquals(2, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + assertEquals(2L, group.timestamp()); // add 2 more records with timestamp 2, 4 to partition-1 final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( @@ -134,7 +134,7 @@ public void testTimeTracking() { assertEquals(6, group.numBuffered()); assertEquals(4, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + assertEquals(2L, group.timestamp()); // get one record, time should not be advanced record = group.nextRecord(info); @@ -146,7 +146,7 @@ public void testTimeTracking() { assertEquals(5, group.numBuffered()); assertEquals(3, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + assertEquals(3L, group.timestamp()); // get one record, time should not be advanced record = group.nextRecord(info); @@ -158,7 +158,7 @@ public void testTimeTracking() { assertEquals(4, group.numBuffered()); assertEquals(3, group.numBuffered(partition1)); assertEquals(1, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + assertEquals(4L, group.timestamp()); // get one more record, now time should be advanced record = group.nextRecord(info); @@ -206,7 +206,7 @@ public void testTimeTracking() { assertEquals(0, group.numBuffered()); assertEquals(0, group.numBuffered(partition1)); assertEquals(0, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + assertEquals(6L, group.timestamp()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index bfbb2a00270..cdb7410fd8a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -57,6 +57,7 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -111,7 +112,7 @@ public void close() { private final ProcessorTopology topology = ProcessorTopology.withSources( Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime), - mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) + mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)) ); private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); @@ -307,97 +308,6 @@ public void testPauseResume() { assertEquals(0, consumer.paused().size()); } - @SuppressWarnings("unchecked") - @Test - public void testMaybePunctuateStreamTime() { - task = createStatelessTask(createConfig(false)); - task.initializeStateStores(); - task.initializeTopology(); - - task.addRecords(partition1, Arrays.asList( - getConsumerRecord(partition1, 0), - getConsumerRecord(partition1, 20), - getConsumerRecord(partition1, 32), - getConsumerRecord(partition1, 40), - getConsumerRecord(partition1, 60) - )); - - task.addRecords(partition2, Arrays.asList( - getConsumerRecord(partition2, 25), - getConsumerRecord(partition2, 35), - getConsumerRecord(partition2, 45), - getConsumerRecord(partition2, 61) - )); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(8, task.numBuffered()); - assertEquals(1, source1.numReceived); - assertEquals(0, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(7, task.numBuffered()); - assertEquals(2, source1.numReceived); - assertEquals(0, source2.numReceived); - - assertFalse(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(6, task.numBuffered()); - assertEquals(2, source1.numReceived); - assertEquals(1, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(5, task.numBuffered()); - assertEquals(3, source1.numReceived); - assertEquals(1, source2.numReceived); - - assertFalse(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(4, task.numBuffered()); - assertEquals(3, source1.numReceived); - assertEquals(2, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(3, task.numBuffered()); - assertEquals(4, source1.numReceived); - assertEquals(2, source2.numReceived); - - assertFalse(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(2, task.numBuffered()); - assertEquals(4, source1.numReceived); - assertEquals(3, source2.numReceived); - - assertTrue(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(1, task.numBuffered()); - assertEquals(5, source1.numReceived); - assertEquals(3, source2.numReceived); - - assertFalse(task.maybePunctuateStreamTime()); - - assertTrue(task.process()); - assertEquals(0, task.numBuffered()); - assertEquals(5, source1.numReceived); - assertEquals(4, source2.numReceived); - - assertFalse(task.process()); - assertFalse(task.maybePunctuateStreamTime()); - - processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L); - } - @SuppressWarnings("unchecked") @Test public void shouldPunctuateOnceStreamTimeAfterGap() { @@ -419,6 +329,7 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { getConsumerRecord(partition2, 161) )); + // st: 20 assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20 assertTrue(task.process()); @@ -426,6 +337,7 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { assertEquals(1, source1.numReceived); assertEquals(0, source2.numReceived); + // st: 20 assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); @@ -433,9 +345,7 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { assertEquals(1, source1.numReceived); assertEquals(1, source2.numReceived); - assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142 - - // only one punctuation after 100ms gap + // st: 25 assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); @@ -443,50 +353,58 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { assertEquals(2, source1.numReceived); assertEquals(1, source2.numReceived); - assertFalse(task.maybePunctuateStreamTime()); + // st: 142 + assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142 assertTrue(task.process()); assertEquals(4, task.numBuffered()); assertEquals(2, source1.numReceived); assertEquals(2, source2.numReceived); - assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155 + // st: 145 + // only one punctuation after 100ms gap + assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(3, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(2, source2.numReceived); - assertFalse(task.maybePunctuateStreamTime()); + // st: 155 + assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155 assertTrue(task.process()); assertEquals(2, task.numBuffered()); assertEquals(3, source1.numReceived); assertEquals(3, source2.numReceived); - assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, still aligned on the initial punctuation + // st: 159 + assertFalse(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(1, task.numBuffered()); assertEquals(4, source1.numReceived); assertEquals(3, source2.numReceived); - assertFalse(task.maybePunctuateStreamTime()); + // st: 160, aligned at 0 + assertTrue(task.maybePunctuateStreamTime()); assertTrue(task.process()); assertEquals(0, task.numBuffered()); assertEquals(4, source1.numReceived); assertEquals(4, source2.numReceived); + // st: 161 + assertFalse(task.maybePunctuateStreamTime()); + assertFalse(task.process()); assertFalse(task.maybePunctuateStreamTime()); processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L); } - @SuppressWarnings("unchecked") @Test - public void testCancelPunctuateStreamTime() { + public void shouldRespectPunctuateCancellationStreamTime() { task = createStatelessTask(createConfig(false)); task.initializeStateStores(); task.initializeTopology(); @@ -518,6 +436,61 @@ public void testCancelPunctuateStreamTime() { processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L); } + @Test + public void shouldRespectPunctuateCancellationSystemTime() { + task = createStatelessTask(createConfig(false)); + task.initializeStateStores(); + task.initializeTopology(); + final long now = time.milliseconds(); + time.sleep(10); + assertTrue(task.maybePunctuateSystemTime()); + processorSystemTime.mockProcessor.scheduleCancellable.cancel(); + time.sleep(10); + assertFalse(task.maybePunctuateSystemTime()); + processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10); + } + + @Test + public void shouldBeProcessableIfAllPartitionsBuffered() { + task = createStatelessTask(createConfig(false)); + task.initializeStateStores(); + task.initializeTopology(); + + assertFalse(task.isProcessable()); + + final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); + + task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); + + assertFalse(task.isProcessable()); + + task.addRecords(partition2, Collections.singleton(new ConsumerRecord<>(topic2, 1, 0, bytes, bytes))); + + assertTrue(task.isProcessable()); + } + + @Test + public void shouldBeProcessableIfWaitedForTooLong() { + task = createStatelessTask(createConfig(false)); + task.initializeStateStores(); + task.initializeTopology(); + + assertFalse(task.isProcessable()); + + final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array(); + + task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1, 1, 0, bytes, bytes))); + + assertFalse(task.isProcessable()); + assertFalse(task.isProcessable()); + assertFalse(task.isProcessable()); + assertFalse(task.isProcessable()); + assertFalse(task.isProcessable()); + + assertTrue(task.isProcessable()); + } + + @Test public void shouldPunctuateSystemTimeWhenIntervalElapsed() { task = createStatelessTask(createConfig(false)); @@ -575,20 +548,6 @@ public void shouldPunctuateOnceSystemTimeAfterGap() { processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240); } - @Test - public void testCancelPunctuateSystemTime() { - task = createStatelessTask(createConfig(false)); - task.initializeStateStores(); - task.initializeTopology(); - final long now = time.milliseconds(); - time.sleep(10); - assertTrue(task.maybePunctuateSystemTime()); - processorSystemTime.mockProcessor.scheduleCancellable.cancel(); - time.sleep(10); - assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10); - } - @Test public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() { task = createTaskThatThrowsException(); @@ -1110,7 +1069,7 @@ private StreamTask createStatefulTaskThatThrowsExceptionOnClose() { private StreamTask createStatelessTask(final StreamsConfig streamsConfig) { final ProcessorTopology topology = ProcessorTopology.withSources( Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime), - mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) + mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)) ); source1.addChild(processorStreamTime); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 93d4e94a234..b2ae7e30df2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -177,7 +177,7 @@ public void testPartitionAssignmentChangeForSingleGroup() { mockConsumer.assign(assignedPartitions); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); assertEquals(thread.state(), StreamThread.State.RUNNING); Assert.assertEquals(4, stateListener.numChanges); Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState); @@ -307,9 +307,11 @@ public void shouldNotCommitBeforeTheCommitInterval() { new LogContext(""), new AtomicInteger() ); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); mockTime.sleep(commitInterval - 10L); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); EasyMock.verify(taskManager); } @@ -341,9 +343,11 @@ public void shouldNotCauseExceptionIfNothingCommitted() { new LogContext(""), new AtomicInteger() ); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); mockTime.sleep(commitInterval - 10L); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); EasyMock.verify(taskManager); } @@ -376,9 +380,11 @@ public void shouldCommitAfterTheCommitInterval() { new LogContext(""), new AtomicInteger() ); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); mockTime.sleep(commitInterval + 1); - thread.maybeCommit(mockTime.milliseconds()); + thread.setNow(mockTime.milliseconds()); + thread.maybeCommit(); EasyMock.verify(taskManager); } @@ -456,7 +462,7 @@ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() mockConsumer.updateBeginningOffsets(beginOffsets); thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); - thread.runOnce(-1); + thread.runOnce(); assertEquals(thread.tasks().size(), clientSupplier.producers.size()); assertSame(clientSupplier.consumer, thread.consumer); @@ -645,7 +651,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); assertThat(thread.tasks().size(), equalTo(1)); final MockProducer producer = clientSupplier.producers.get(0); @@ -656,7 +662,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1); - thread.runOnce(-1); + thread.runOnce(); assertThat(producer.history().size(), equalTo(1)); assertFalse(producer.transactionCommitted()); @@ -665,16 +671,16 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh new TestCondition() { @Override public boolean conditionMet() { - return producer.commitCount() == 2; + return producer.commitCount() == 1; } }, "StreamsThread did not commit transaction."); producer.fenceProducer(); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 1, new byte[0], new byte[0])); try { - thread.runOnce(-1); + thread.runOnce(); fail("Should have thrown TaskMigratedException"); } catch (final TaskMigratedException expected) { /* ignore */ } TestUtils.waitForCondition( @@ -686,7 +692,7 @@ public boolean conditionMet() { }, "StreamsThread did not remove fenced zombie task."); - assertThat(producer.commitCount(), equalTo(2L)); + assertThat(producer.commitCount(), equalTo(1L)); } @Test @@ -713,7 +719,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAt mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); assertThat(thread.tasks().size(), equalTo(1)); @@ -721,7 +727,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAt clientSupplier.producers.get(0).fenceProducer(); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); try { - thread.runOnce(-1); + thread.runOnce(); fail("Should have thrown TaskMigratedException"); } catch (final TaskMigratedException expected) { /* ignore */ } @@ -772,7 +778,7 @@ public void shouldReturnActiveTaskMetadataWhileRunningState() { mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); final ThreadMetadata threadMetadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); @@ -816,7 +822,7 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() { thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); - thread.runOnce(-1); + thread.runOnce(); final ThreadMetadata threadMetadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); @@ -881,7 +887,7 @@ public void shouldUpdateStandbyTask() throws IOException { thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); - thread.runOnce(-1); + thread.runOnce(); final StandbyTask standbyTask1 = thread.taskManager().standbyTask(partition1); final StandbyTask standbyTask2 = thread.taskManager().standbyTask(partition2); @@ -947,7 +953,7 @@ public void close() {} clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); assertEquals(0, punctuatedStreamTime.size()); assertEquals(0, punctuatedWallClockTime.size()); @@ -957,14 +963,14 @@ public void close() {} clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + i).getBytes())); } - thread.runOnce(-1); + thread.runOnce(); assertEquals(1, punctuatedStreamTime.size()); assertEquals(1, punctuatedWallClockTime.size()); mockTime.sleep(100L); - thread.runOnce(-1); + thread.runOnce(); // we should skip stream time punctuation, only trigger wall-clock time punctuation assertEquals(1, punctuatedStreamTime.size()); @@ -1155,7 +1161,7 @@ public void shouldRecordSkippedMetricForDeserializationException() { mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); @@ -1165,7 +1171,7 @@ public void shouldRecordSkippedMetricForDeserializationException() { long offset = -1; mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes())); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes())); - thread.runOnce(-1); + thread.runOnce(); assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue()); assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue()); @@ -1199,7 +1205,7 @@ public void shouldReportSkippedRecordsForInvalidTimestamps() { mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); + thread.runOnce(); final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName())); @@ -1209,7 +1215,7 @@ public void shouldReportSkippedRecordsForInvalidTimestamps() { long offset = -1; mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); - thread.runOnce(-1); + thread.runOnce(); assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue()); assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue()); @@ -1217,13 +1223,13 @@ public void shouldReportSkippedRecordsForInvalidTimestamps() { mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); - thread.runOnce(-1); + thread.runOnce(); assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue()); assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue()); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0])); - thread.runOnce(-1); + thread.runOnce(); assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue()); assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue()); diff --git a/streams/src/test/resources/log4j.properties b/streams/src/test/resources/log4j.properties index be36f90299a..0c0caf162c9 100644 --- a/streams/src/test/resources/log4j.properties +++ b/streams/src/test/resources/log4j.properties @@ -12,10 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=INFO, stdout +log4j.rootLogger=WARN, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.org.apache.kafka=INFO +log4j.logger.org.apache.kafka=WARN diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala index 32ad793bc84..d43c1970a14 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala @@ -24,11 +24,7 @@ import org.apache.kafka.common.serialization._ import org.apache.kafka.common.utils.MockTime import org.apache.kafka.streams._ import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} -import org.apache.kafka.streams.processor.internals.StreamThread -import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.test.TestUtils -import org.junit.Assert._ import org.junit._ import org.junit.rules.TemporaryFolder import org.scalatest.junit.JUnitSuite @@ -129,9 +125,9 @@ class StreamToTableJoinScalaIntegrationTestBase extends JUnitSuite with StreamTo // consume and verify result val consumerConfig = getConsumerConfig() - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, - outputTopic, - expectedClicksPerRegion.size) + IntegrationTestUtils.waitUntilExactKeyValueRecordsReceived(consumerConfig, + outputTopic, + expectedClicksPerRegion.asJava) } else { java.util.Collections.emptyList() } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index e5253f95d45..3d1bab5d086 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -82,9 +82,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = produceNConsume(userClicksTopic, userRegionsTopic, outputTopic) streams.close() - - import collection.JavaConverters._ - assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key)) } @Test def testShouldCountClicksPerRegionJava(): Unit = { @@ -149,6 +146,5 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ) streams.close() - assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key)) } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stream timestamp computation needs some further thoughts > -------------------------------------------------------- > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > Labels: architecture > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) > {quote} > Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)