This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch release/1.12.0
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/release/1.12.0 by this push:
     new 37a18a0  GEODE-7832: Remove Connection Semaphores (#4754)
37a18a0 is described below

commit 37a18a0b921c65645700efa202d388ecd0a04ccb
Author: Juan José Ramos <jujora...@users.noreply.github.com>
AuthorDate: Wed Mar 4 09:19:20 2020 +0000

    GEODE-7832: Remove Connection Semaphores (#4754)
    
    Removed the semaphores and related methods from DirectChannel and
    Connection classes. They were used to constrain messaging when some
    undocumented system properties were set.
    
    (cherry picked from commit 2a3e09f2da4793d08d1124b5d5e656285295937d)
---
 .../internal/ClusterOperationExecutors.java        |   8 -
 .../distributed/internal/direct/DirectChannel.java | 200 +++++----------------
 .../apache/geode/internal/cache/properties.html    |  34 ----
 .../org/apache/geode/internal/tcp/Connection.java  |  81 +--------
 4 files changed, 49 insertions(+), 274 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
index ddd2aff..d16e7fd 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java
@@ -38,7 +38,6 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.internal.monitoring.ThreadsMonitoringImpl;
 import org.apache.geode.internal.monitoring.ThreadsMonitoringImplDummy;
-import org.apache.geode.internal.tcp.Connection;
 import org.apache.geode.internal.tcp.ConnectionTable;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
@@ -375,7 +374,6 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
     FunctionExecutionPooledExecutor.setIsFunctionExecutionThread(Boolean.TRUE);
     try {
       ConnectionTable.threadWantsSharedResources();
-      Connection.makeReaderThread();
       runUntilShutdown(command);
     } finally {
       ConnectionTable.releaseThreadsSockets();
@@ -388,7 +386,6 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
     stats.incNumProcessingThreads(1);
     try {
       ConnectionTable.threadWantsSharedResources();
-      Connection.makeReaderThread();
       runUntilShutdown(command);
     } finally {
       ConnectionTable.releaseThreadsSockets();
@@ -400,7 +397,6 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
     stats.incHighPriorityThreads(1);
     try {
       ConnectionTable.threadWantsSharedResources();
-      Connection.makeReaderThread();
       runUntilShutdown(command);
     } finally {
       ConnectionTable.releaseThreadsSockets();
@@ -412,7 +408,6 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
     stats.incWaitingThreads(1);
     try {
       ConnectionTable.threadWantsSharedResources();
-      Connection.makeReaderThread();
       runUntilShutdown(command);
     } finally {
       ConnectionTable.releaseThreadsSockets();
@@ -424,7 +419,6 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
     stats.incPartitionedRegionThreads(1);
     try {
       ConnectionTable.threadWantsSharedResources();
-      Connection.makeReaderThread();
       runUntilShutdown(command);
     } finally {
       ConnectionTable.releaseThreadsSockets();
@@ -436,7 +430,6 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
     stats.incNumSerialThreads(1);
     try {
       ConnectionTable.threadWantsSharedResources();
-      Connection.makeReaderThread();
       runUntilShutdown(command);
     } finally {
       ConnectionTable.releaseThreadsSockets();
@@ -816,7 +809,6 @@ public class ClusterOperationExecutors implements 
OperationExecutors {
 
     private void doSerialPooledThread(Runnable command) {
       ConnectionTable.threadWantsSharedResources();
-      Connection.makeReaderThread();
       try {
         command.run();
       } finally {
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index 10b0108..4799270 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.Semaphore;
 
 import org.apache.logging.log4j.Logger;
 
@@ -36,7 +35,6 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.alerting.internal.spi.AlertingAction;
 import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
@@ -58,7 +56,6 @@ import org.apache.geode.internal.tcp.ConnectionException;
 import org.apache.geode.internal.tcp.MsgStreamer;
 import org.apache.geode.internal.tcp.TCPConduit;
 import org.apache.geode.internal.util.Breadcrumbs;
-import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
@@ -98,8 +95,6 @@ public class DirectChannel {
     if (disconnected) {
       disconnected = false;
       disconnectCompleted = false;
-      this.groupOrderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS);
-      this.groupUnorderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS);
     }
   }
 
@@ -145,8 +140,6 @@ public class DirectChannel {
       this.conduit = new TCPConduit(mgr, port, address, isBindAddress, this, 
props);
       disconnected = false;
       disconnectCompleted = false;
-      this.groupOrderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS);
-      this.groupUnorderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS);
       logger.info("GemFire P2P Listener started on {}",
           conduit.getSocketId());
 
@@ -158,65 +151,6 @@ public class DirectChannel {
     }
   }
 
-
-  /**
-   * Return how many concurrent operations should be allowed by default. since 
6.6, this has been
-   * raised to Integer.MAX value from the number of available processors. 
Setting this to a lower
-   * value raises the possibility of a deadlock when serializing a message 
with PDX objects, because
-   * the PDX serialization can trigger further distribution.
-   */
-  public static final int DEFAULT_CONCURRENCY_LEVEL =
-      Integer.getInteger("p2p.defaultConcurrencyLevel", Integer.MAX_VALUE / 
2).intValue();
-
-  /**
-   * The maximum number of concurrent senders sending a message to a group of 
recipients.
-   */
-  private static final int MAX_GROUP_SENDERS =
-      Integer.getInteger("p2p.maxGroupSenders", 
DEFAULT_CONCURRENCY_LEVEL).intValue();
-  private Semaphore groupUnorderedSenderSem;
-  private Semaphore groupOrderedSenderSem;
-
-  private Semaphore getGroupSem(boolean ordered) {
-    if (ordered) {
-      return this.groupOrderedSenderSem;
-    } else {
-      return this.groupUnorderedSenderSem;
-    }
-  }
-
-  private void acquireGroupSendPermission(boolean ordered) {
-    if (this.disconnected) {
-      throw new 
org.apache.geode.distributed.DistributedSystemDisconnectedException(
-          "Direct channel has been stopped");
-    }
-    // @todo darrel: add some stats
-    final Semaphore s = getGroupSem(ordered);
-    for (;;) {
-      this.conduit.getCancelCriterion().checkCancelInProgress(null);
-      boolean interrupted = Thread.interrupted();
-      try {
-        s.acquire();
-        break;
-      } catch (InterruptedException ex) {
-        interrupted = true;
-      } finally {
-        if (interrupted) {
-          Thread.currentThread().interrupt();
-        }
-      }
-    } // for
-    if (this.disconnected) {
-      s.release();
-      throw new DistributedSystemDisconnectedException(
-          "communications disconnected");
-    }
-  }
-
-  private void releaseGroupSendPermission(boolean ordered) {
-    final Semaphore s = getGroupSem(ordered);
-    s.release();
-  }
-
   /**
    * Returns true if calling thread owns its own communication resources.
    */
@@ -352,95 +286,67 @@ public class DirectChannel {
           return bytesWritten;
         }
 
-        boolean sendingToGroup = cons.size() > 1;
-        Connection permissionCon = null;
-        if (sendingToGroup) {
-          acquireGroupSendPermission(orderedMsg);
-        } else {
-          // sending over just one connection
-          permissionCon = (Connection) cons.get(0);
-          if (permissionCon != null) {
-            try {
-              permissionCon.acquireSendPermission();
-            } catch (ConnectionException conEx) {
-              // Set retryInfo and then retry.
-              // We want to keep calling TCPConduit.getConnection until it 
doesn't
-              // return a connection.
-              retryInfo = new ConnectExceptions();
-              retryInfo.addFailure(permissionCon.getRemoteAddress(), conEx);
-              continue;
-            }
-          }
+        if (retry && logger.isDebugEnabled()) {
+          logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip",
+              msg, cons.size(), cons);
         }
+        DMStats stats = getDMStats();
+        List<?> sentCons; // used for cons we sent to this time
 
+        final BaseMsgStreamer ms =
+            MsgStreamer.create(cons, msg, directReply, stats, 
getConduit().getBufferPool());
         try {
-          if (retry && logger.isDebugEnabled()) {
-            logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip",
-                msg, cons.size(), cons);
+          startTime = 0;
+          if (ackTimeout > 0) {
+            startTime = System.currentTimeMillis();
           }
-          DMStats stats = getDMStats();
-          List<?> sentCons; // used for cons we sent to this time
+          ms.reserveConnections(startTime, ackTimeout, ackSDTimeout);
+
+          int result = ms.writeMessage();
+          if (bytesWritten == 0) {
+            // bytesWritten only needs to be set once.
+            // if we have to do a retry we don't want to count
+            // each one's bytes.
+            bytesWritten = result;
+          }
+          ce = ms.getConnectExceptions();
+          sentCons = ms.getSentConnections();
 
-          final BaseMsgStreamer ms =
-              MsgStreamer.create(cons, msg, directReply, stats, 
getConduit().getBufferPool());
+          totalSentCons.addAll(sentCons);
+        } catch (NotSerializableException e) {
+          throw e;
+        } catch (IOException ex) {
+          throw new InternalGemFireException(
+              "Unknown error serializing message",
+              ex);
+        } finally {
           try {
-            startTime = 0;
-            if (ackTimeout > 0) {
-              startTime = System.currentTimeMillis();
-            }
-            ms.reserveConnections(startTime, ackTimeout, ackSDTimeout);
-
-            int result = ms.writeMessage();
-            if (bytesWritten == 0) {
-              // bytesWritten only needs to be set once.
-              // if we have to do a retry we don't want to count
-              // each one's bytes.
-              bytesWritten = result;
-            }
-            ce = ms.getConnectExceptions();
-            sentCons = ms.getSentConnections();
-
-            totalSentCons.addAll(sentCons);
-          } catch (NotSerializableException e) {
-            throw e;
-          } catch (IOException ex) {
-            throw new InternalGemFireException(
-                "Unknown error serializing message",
-                ex);
-          } finally {
-            try {
-              ms.close();
-            } catch (IOException e) {
-              throw new InternalGemFireException("Unknown error serializing 
message", e);
-            }
+            ms.close();
+          } catch (IOException e) {
+            throw new InternalGemFireException("Unknown error serializing 
message", e);
           }
+        }
 
-          if (ce != null) {
-            retryInfo = ce;
-            ce = null;
-          }
+        if (ce != null) {
+          retryInfo = ce;
+          ce = null;
+        }
 
-          if (directReply && !sentCons.isEmpty()) {
-            long readAckStart = 0;
+        if (directReply && !sentCons.isEmpty()) {
+          long readAckStart = 0;
+          if (stats != null) {
+            readAckStart = stats.startReplyWait();
+          }
+          try {
+            ce = readAcks(sentCons, startTime, ackTimeout, ackSDTimeout, ce,
+                directMsg.getDirectReplyProcessor());
+          } finally {
             if (stats != null) {
-              readAckStart = stats.startReplyWait();
-            }
-            try {
-              ce = readAcks(sentCons, startTime, ackTimeout, ackSDTimeout, ce,
-                  directMsg.getDirectReplyProcessor());
-            } finally {
-              if (stats != null) {
-                stats.endReplyWait(readAckStart, startTime);
-              }
+              stats.endReplyWait(readAckStart, startTime);
             }
           }
-        } finally {
-          if (sendingToGroup) {
-            releaseGroupSendPermission(orderedMsg);
-          } else if (permissionCon != null) {
-            permissionCon.releaseSendPermission();
-          }
         }
+
         if (ce != null) {
           if (retryInfo != null) {
             retryInfo.getMembers().addAll(ce.getMembers());
@@ -734,16 +640,6 @@ public class DirectChannel {
   public synchronized void disconnect(Exception cause) {
     this.disconnected = true;
     this.disconnectCompleted = false;
-    try {
-      groupOrderedSenderSem.release();
-    } catch (Error e) {
-      // GEODE-1076 - already released
-    }
-    try {
-      groupUnorderedSenderSem.release();
-    } catch (Error e) {
-      // GEODE-1076 - already released
-    }
     this.conduit.stop(cause);
     this.disconnectCompleted = true;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html 
b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
index 5e3dbcf..1f5b88d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
@@ -2657,23 +2657,6 @@ TBA
 </dd>
 
 <!-- -------------------------------------------------------  -->
-<dt><strong>p2p.defaultConcurrencyLevel</strong></dt>
-<dd>
-<em>Public:</em> false
-<p>
-<em>Integer</em> (default is the number of processors on current machine,
-but no less than 2)
-<p>
-See 
<code>org.apache.geode.distributed.internal.direct.DirectChannel#DEFAULT_CONCURRENCY_LEVEL</code>.
-<p>
-<pre>
-   Return how many concurrent operations should be allowed by default.
-</pre>
-<p>
-TBA
-</dd>
-
-<!-- -------------------------------------------------------  -->
 <dt><strong>p2p.defaultLogLevel</strong></dt>
 <dd>
 <em>Public:</em> false
@@ -2787,23 +2770,6 @@ TBA
 </dd>
 
 <!-- -------------------------------------------------------  -->
-<dt><strong>p2p.maxConnectionSenders</strong></dt>
-<dd>
-<em>Public:</em> false
-<p>
-<em>Integer</em> (default is p2p.defaultConcurrencyLevel)
-<p>
-See <code>org.apache.geode.internal.tcp.Connection#MAX_SENDERS</code>.
-<p>
-<pre>
-  The maximum number of concurrent senders sending a message to a single
-  recipient.
-</pre>
-<p>
-TBA
-</dd>
-
-<!-- -------------------------------------------------------  -->
 <dt><strong>p2p.maxGroupSenders</strong></dt>
 <dd>
 <em>Public:</em> false
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index e91b1fa..d266688 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -38,7 +38,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -70,7 +69,6 @@ import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.ReplySender;
-import org.apache.geode.distributed.internal.direct.DirectChannel;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import 
org.apache.geode.distributed.internal.membership.api.MemberShunnedException;
 import org.apache.geode.distributed.internal.membership.api.Membership;
@@ -85,7 +83,6 @@ import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.internal.tcp.MsgReader.Header;
-import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
 import org.apache.geode.logging.internal.executors.LoggingThread;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
@@ -159,8 +156,6 @@ public class Connection implements Runnable {
    */
   private SystemTimerTask idleTask;
 
-  private static final ThreadLocal<Boolean> isReaderThread = withInitial(() -> 
FALSE);
-
   /**
    * If true then readers for thread owned sockets will send all messages on 
thread owned senders.
    * Even normally unordered msgs get send on TO socks.
@@ -255,18 +250,6 @@ public class Connection implements Runnable {
   /** used for async writes */
   private Thread pusherThread;
 
-  /**
-   * The maximum number of concurrent senders sending a message to a single 
recipient.
-   */
-  private static final int MAX_SENDERS = Integer
-      .getInteger("p2p.maxConnectionSenders", 
DirectChannel.DEFAULT_CONCURRENCY_LEVEL);
-  /**
-   * This semaphore is used to throttle how many threads will try to do sends 
on this connection
-   * concurrently. A thread must acquire this semaphore before it is allowed 
to start serializing
-   * its message.
-   */
-  private final Semaphore senderSem = new ReentrantSemaphore(MAX_SENDERS);
-
   /** Set to true once the handshake has been read */
   private volatile boolean handshakeRead;
   private volatile boolean handshakeCancelled;
@@ -515,20 +498,6 @@ public class Connection implements Runnable {
     return sharedResource;
   }
 
-  public static void makeReaderThread() {
-    // mark this thread as a reader thread
-    makeReaderThread(true);
-  }
-
-  private static void makeReaderThread(boolean v) {
-    isReaderThread.set(v);
-  }
-
-  // return true if this thread is a reader thread
-  private static boolean isReaderThread() {
-    return isReaderThread.get();
-  }
-
   @VisibleForTesting
   int getP2PConnectTimeout(DistributionConfig config) {
     if (AlertingAction.isThreadAlerting()) {
@@ -1336,7 +1305,6 @@ public class Connection implements Runnable {
             }
           }
           connected = false;
-          closeSenderSem();
 
           final DMStats stats = owner.getConduit().getStats();
           if (finishedConnecting) {
@@ -1467,7 +1435,7 @@ public class Connection implements Runnable {
     readerThread = Thread.currentThread();
     readerThread.setName(p2pReaderName());
     ConnectionTable.threadWantsSharedResources();
-    makeReaderThread(isReceiver);
+
     try {
       readMessages();
     } finally {
@@ -3274,53 +3242,6 @@ public class Connection implements Runnable {
     return messagesSent;
   }
 
-  public void acquireSendPermission() throws ConnectionException {
-    if (!connected) {
-      throw new ConnectionException("connection is closed");
-    }
-    if (isReaderThread()) {
-      // reader threads send replies and we always want to permit those 
without waiting
-      return;
-    }
-    boolean interrupted = false;
-    try {
-      for (;;) {
-        owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-        try {
-          senderSem.acquire();
-          break;
-        } catch (InterruptedException ex) {
-          interrupted = true;
-        }
-      }
-    } finally {
-      if (interrupted) {
-        Thread.currentThread().interrupt();
-      }
-    }
-    if (!connected) {
-      senderSem.release();
-      owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-      throw new ConnectionException("connection is closed");
-    }
-  }
-
-  public void releaseSendPermission() {
-    if (isReaderThread()) {
-      return;
-    }
-    senderSem.release();
-  }
-
-  private void closeSenderSem() {
-    // All we need to do is increase the number of permits by one
-    // just in case 1 or more connections are currently waiting to acquire.
-    // One of them will get the permit, then find out the connection is closed
-    // and release the permit until all the connections currently waiting to 
acquire
-    // will complete by throwing a ConnectionException.
-    releaseSendPermission();
-  }
-
   private class BatchBufferFlusher extends Thread {
 
     private volatile boolean flushNeeded;

Reply via email to