This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch revert-28173-issue-72555-2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2a25eb457010dfd30018b5e63e9a47ef50defd0f
Author: Yi Hu <huu...@gmail.com>
AuthorDate: Wed Sep 13 11:32:54 2023 -0400

    Revert "Fix flaky test StreamingDataflowWorkerTest (#28173)"
    
    This reverts commit 505f94213874471ae4ec5fa810c73a11dc5be1a8.
---
 .../dataflow/worker/util/BoundedQueueExecutor.java | 24 ++---------
 .../worker/StreamingDataflowWorkerTest.java        | 49 ++++++++--------------
 2 files changed, 21 insertions(+), 52 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 1905cf3ac47..05b752f91c0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.worker.util;
 
-import java.time.Clock;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -49,24 +48,6 @@ public class BoundedQueueExecutor {
       int maximumElementsOutstanding,
       long maximumBytesOutstanding,
       ThreadFactory threadFactory) {
-    this(
-        maximumPoolSize,
-        keepAliveTime,
-        unit,
-        maximumElementsOutstanding,
-        maximumBytesOutstanding,
-        threadFactory,
-        Clock.systemUTC());
-  }
-
-  public BoundedQueueExecutor(
-      int maximumPoolSize,
-      long keepAliveTime,
-      TimeUnit unit,
-      int maximumElementsOutstanding,
-      long maximumBytesOutstanding,
-      ThreadFactory threadFactory,
-      Clock clock) {
     executor =
         new ThreadPoolExecutor(
             maximumPoolSize,
@@ -80,7 +61,7 @@ public class BoundedQueueExecutor {
             super.beforeExecute(t, r);
             synchronized (this) {
               if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
-                startTimeMaxActiveThreadsUsed = clock.millis();
+                startTimeMaxActiveThreadsUsed = System.currentTimeMillis();
               }
             }
           }
@@ -90,7 +71,8 @@ public class BoundedQueueExecutor {
             super.afterExecute(r, t);
             synchronized (this) {
               if (activeCount.getAndDecrement() == maximumPoolSize) {
-                totalTimeMaxActiveThreadsUsed += (clock.millis() - 
startTimeMaxActiveThreadsUsed);
+                totalTimeMaxActiveThreadsUsed +=
+                    (System.currentTimeMillis() - 
startTimeMaxActiveThreadsUsed);
                 startTimeMaxActiveThreadsUsed = 0;
               }
             }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index 8dc7f6217cd..95b3a43ebf4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -33,7 +33,11 @@ import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.InstructionInput;
@@ -52,7 +56,6 @@ import com.google.api.services.dataflow.model.WorkItemStatus;
 import com.google.api.services.dataflow.model.WriteInstruction;
 import java.io.IOException;
 import java.io.InputStream;
-import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -167,6 +170,7 @@ import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ErrorCollector;
@@ -2851,24 +2855,10 @@ public class StreamingDataflowWorkerTest {
   }
 
   @Test
+  @Ignore // Test is flaky on Jenkins (#27555)
   public void testMaxThreadMetric() throws Exception {
     int maxThreads = 2;
     int threadExpiration = 60;
-
-    Clock mockClock = Mockito.mock(Clock.class);
-    CountDownLatch latch = new CountDownLatch(2);
-    doAnswer(
-            invocation -> {
-              latch.countDown();
-              // Return 0 until we are called once (reach max thread count).
-              if (latch.getCount() == 1) {
-                return 0L;
-              }
-              return 1000L;
-            })
-        .when(mockClock)
-        .millis();
-
     // setting up actual implementation of executor instead of mocking to keep 
track of
     // active thread count.
     BoundedQueueExecutor executor =
@@ -2881,8 +2871,7 @@ public class StreamingDataflowWorkerTest {
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
-                .build(),
-            mockClock);
+                .build());
 
     StreamingDataflowWorker.ComputationState computationState =
         new StreamingDataflowWorker.ComputationState(
@@ -2894,17 +2883,15 @@ public class StreamingDataflowWorkerTest {
 
     ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 
1);
 
+    // overriding definition of MockWork to add sleep, which will help us keep 
track of how
+    // long each work item takes to process and therefore let us manipulate 
how long the time
+    // at which we're at max threads is.
     MockWork m2 =
         new MockWork(2) {
           @Override
           public void run() {
             try {
-              // Make sure we don't finish before both MockWork are executed, 
thus afterExecute must
-              // be called after
-              // beforeExecute.
-              while (latch.getCount() > 1) {
-                Thread.sleep(50);
-              }
+              Thread.sleep(1000);
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
             }
@@ -2916,9 +2903,7 @@ public class StreamingDataflowWorkerTest {
           @Override
           public void run() {
             try {
-              while (latch.getCount() > 1) {
-                Thread.sleep(50);
-              }
+              Thread.sleep(1000);
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
             }
@@ -2928,11 +2913,13 @@ public class StreamingDataflowWorkerTest {
     assertTrue(computationState.activateWork(key1Shard1, m2));
     assertTrue(computationState.activateWork(key1Shard1, m3));
     executor.execute(m2, m2.getWorkItem().getSerializedSize());
+
     executor.execute(m3, m3.getWorkItem().getSerializedSize());
-    // Wait until the afterExecute is called.
-    latch.await();
 
-    assertEquals(1000L, executor.allThreadsActiveTime());
+    // Will get close to 1000ms that both work items are processing (sleeping, 
really)
+    // give or take a few ms.
+    long i = 990L;
+    assertTrue(executor.allThreadsActiveTime() >= i);
     executor.shutdown();
   }
 

Reply via email to