[ 
https://issues.apache.org/jira/browse/KAFKA-7162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695335#comment-16695335
 ] 

ASF GitHub Bot commented on KAFKA-7162:
---------------------------------------

gitlw closed pull request #5367: KAFKA-7162: Fixing flaky tests
URL: https://github.com/apache/kafka/pull/5367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 2a367e0d242..f1a41004419 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -223,7 +223,7 @@ class LogValidatorTest {
     val validatingResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
@@ -289,7 +289,7 @@ class LogValidatorTest {
     val validatingResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = GZIPCompressionCodec,
       compactedTopic = false,
@@ -434,7 +434,7 @@ class LogValidatorTest {
     val validatedResults = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
       magic = magic,
@@ -484,7 +484,7 @@ class LogValidatorTest {
       records,
       offsetCounter = new LongRef(0),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
@@ -504,7 +504,7 @@ class LogValidatorTest {
       records,
       offsetCounter = new LongRef(0),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
@@ -524,7 +524,7 @@ class LogValidatorTest {
       records,
       offsetCounter = new LongRef(0),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
       magic = RecordBatch.MAGIC_VALUE_V1,
@@ -544,7 +544,7 @@ class LogValidatorTest {
       records,
       offsetCounter = new LongRef(0),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
       magic = RecordBatch.MAGIC_VALUE_V1,
@@ -602,7 +602,7 @@ class LogValidatorTest {
     val messageWithOffset = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
@@ -623,7 +623,7 @@ class LogValidatorTest {
     val messageWithOffset = 
LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
@@ -645,7 +645,7 @@ class LogValidatorTest {
       records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
       compactedTopic = false,
@@ -667,7 +667,7 @@ class LogValidatorTest {
       records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
       compactedTopic = false,
@@ -780,7 +780,7 @@ class LogValidatorTest {
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
       magic = RecordBatch.CURRENT_MAGIC_VALUE,
-      timestampType = TimestampType.CREATE_TIME,
+      timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       isFromClient = true)
@@ -799,7 +799,7 @@ class LogValidatorTest {
       targetCodec = SnappyCompressionCodec,
       compactedTopic = false,
       magic = RecordBatch.CURRENT_MAGIC_VALUE,
-      timestampType = TimestampType.CREATE_TIME,
+      timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 5000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
       isFromClient = false)
@@ -818,7 +818,7 @@ class LogValidatorTest {
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
@@ -838,7 +838,7 @@ class LogValidatorTest {
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
       compactedTopic = false,
@@ -896,7 +896,7 @@ class LogValidatorTest {
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
@@ -916,7 +916,7 @@ class LogValidatorTest {
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
       compactedTopic = false,
@@ -980,7 +980,7 @@ class LogValidatorTest {
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
       compactedTopic = false,
@@ -1000,7 +1000,7 @@ class LogValidatorTest {
     checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
       compactedTopic = false,
@@ -1049,15 +1049,16 @@ class LogValidatorTest {
     val (producerId, producerEpoch, baseSequence, isTransactional, 
partitionLeaderEpoch) =
       (1324L, 10.toShort, 984, true, 40)
     val buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD)
+    val now = System.currentTimeMillis()
     DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch,
-      baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, 
System.currentTimeMillis(),
+      baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, 
now,
       isTransactional, false)
     buffer.flip()
     val records = MemoryRecords.readableRecords(buffer)
     LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
-      now = System.currentTimeMillis(),
+      now = now,
       sourceCodec = sourceCodec,
       targetCodec = targetCodec,
       compactedTopic = false,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Flaky unit tests caused by record creation timestamps differ from validation 
> time by more than timestampDiffMaxMs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7162
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7162
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Lucas Wang
>            Assignee: Lucas Wang
>            Priority: Minor
>
> While running gradle unit tests, we found the test method 
> LogValidatorTest.testCompressedV1 can fail sometimes. Upon investigation, it 
> turns out the test method uses one set of timestamps, say t0, t1 and t2, for 
> the records, while using a separate timestamp, say t3, for the "now" 
> parameter when invoking the LogValidator.validateMessagesAndAssignOffsets 
> method. The validateMessagesAndAssignOffsets validation method also takes a 
> parameter timestampDiffMaxMs=1 second, that specifies the maximum allowed 
> time different between t3 and the timestamps in records, i.e. t0, t1, and t2. 
> While running unit tests, especially when multiple tests are run 
> simultaneously, there is no guarantee that the time difference between t3 and 
> t0 is within 1 second, causing the test method to flaky sometimes. Many other 
> test methods in the LogValidatorTest can suffer from the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to