Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/389

Change subject: Set the priority of the following threads to be 
Thread.MAX_PRIORITY: 1. heartbeat thread at NC 2. IPC network thread
......................................................................

Set the priority of the following threads to be Thread.MAX_PRIORITY:
1. heartbeat thread at NC
2. IPC network thread

Change-Id: I4e53a85e21a6bdee48a3ca8d004569700f911fbd
---
A 
hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/heartbeat/HeartBeatTest.java
M 
hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
M 
hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
M 
hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
M 
hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
M 
hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
7 files changed, 97 insertions(+), 22 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/89/389/1

diff --git 
a/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/heartbeat/HeartBeatTest.java
 
b/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/heartbeat/HeartBeatTest.java
new file mode 100644
index 0000000..6749eab
--- /dev/null
+++ 
b/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/heartbeat/HeartBeatTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.client.heartbeat;
+
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.client.stats.HyracksUtils;
+import org.junit.Test;
+
+public class HeartBeatTest {
+
+    @Test
+    public void test() throws Exception {
+        HyracksUtils.init();
+        int cores = Runtime.getRuntime().availableProcessors();
+        Thread[] cpuConsumers = new Thread[cores * 4];
+        for (int i = 0; i < cpuConsumers.length; i++) {
+            cpuConsumers[i] = new Thread(new Runnable() {
+                private long counter = 0;
+                private long round = 0;
+
+                @Override
+                public void run() {
+                    while (true) {
+                        counter = (counter + 1) % Long.MAX_VALUE - 1;
+                        if (counter == 0) {
+                            round++;
+                            if (round == 8L * Integer.MAX_VALUE) {
+                                return;
+                            }
+                        }
+                    }
+                }
+
+            });
+            cpuConsumers[i].setPriority(Thread.MIN_PRIORITY);
+            cpuConsumers[i].start();
+        }
+        for (int i = 0; i < cpuConsumers.length; i++) {
+            cpuConsumers[i].join();
+        }
+        IHyracksClientConnection hcc = new HyracksConnection("localhost", 
2099);
+        Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+        Assert.assertEquals(ncMap.size(), 2);
+        HyracksUtils.deinit();
+    }
+}
diff --git 
a/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
 
b/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
index a41ddd9..f3c12dc 100644
--- 
a/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
+++ 
b/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
@@ -57,7 +57,8 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 0;
         ccConfig.profileDumpPeriod = -1;
-        ccConfig.heartbeatPeriod = 50;
+        ccConfig.heartbeatPeriod = 60;
+        ccConfig.maxHeartbeatLapsePeriods = 5;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
diff --git 
a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
 
b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
index e0d0eb0..94b8d97 100644
--- 
a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
+++ 
b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
@@ -96,7 +96,7 @@
     private class WorkerThread extends Thread {
         WorkerThread() {
             setDaemon(true);
-            setPriority(MAX_PRIORITY);
+            setPriority(Thread.MAX_PRIORITY);
         }
 
         @Override
diff --git 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index e7689d4..92eb0ae 100644
--- 
a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -26,6 +26,7 @@
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Hashtable;
@@ -42,7 +43,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -170,8 +170,8 @@
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(ncConfig.dataIPAddress, 
ncConfig.dataPort, partitionManager, ncConfig.nNetThreads,
-                                        ncConfig.nNetBuffers, 
ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+        netManager = new NetworkManager(ncConfig.dataIPAddress, 
ncConfig.dataPort, partitionManager,
+                ncConfig.nNetThreads, ncConfig.nNetBuffers, 
ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
 
         lccm = new LifeCycleComponentManager();
         queue = new WorkQueue();
@@ -243,11 +243,11 @@
 
     private void init() throws Exception {
         ctx.getIOManager().setExecutor(executor);
-        datasetPartitionManager = new DatasetPartitionManager
-            (this, executor, ncConfig.resultManagerMemory, ncConfig.resultTTL, 
ncConfig.resultSweepThreshold);
-        datasetNetworkManager = new DatasetNetworkManager
-            (ncConfig.resultIPAddress, ncConfig.resultPort, 
datasetPartitionManager,
-             ncConfig.nNetThreads, ncConfig.nNetBuffers, 
ncConfig.resultPublicIPAddress, ncConfig.resultPublicPort);
+        datasetPartitionManager = new DatasetPartitionManager(this, executor, 
ncConfig.resultManagerMemory,
+                ncConfig.resultTTL, ncConfig.resultSweepThreshold);
+        datasetNetworkManager = new 
DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
+                datasetPartitionManager, ncConfig.nNetThreads, 
ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
+                ncConfig.resultPublicPort);
     }
 
     @Override
@@ -273,12 +273,11 @@
         if (ncConfig.dataPublicIPAddress != null) {
             netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, 
ncConfig.dataPublicPort);
         }
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress,
-                datasetAddress, osMXBean.getName(), osMXBean.getArch(), 
osMXBean
-                        .getVersion(), osMXBean.getAvailableProcessors(), 
runtimeMXBean.getVmName(), runtimeMXBean
-                        .getVmVersion(), runtimeMXBean.getVmVendor(), 
runtimeMXBean.getClassPath(), runtimeMXBean
-                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), 
runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema));
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(), runtimeMXBean
+                        .getClassPath(), runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), 
runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
             while (registrationPending) {
@@ -294,6 +293,11 @@
 
         heartbeatTask = new HeartbeatTask(ccs);
 
+        // Use reflection to set the priority of the timer thread.
+        Field threadField = timer.getClass().getDeclaredField("thread");
+        threadField.setAccessible(true);
+        Thread timerThread = (Thread) threadField.get(timer); // The internal 
timer thread of the Timer object.
+        timerThread.setPriority(Thread.MAX_PRIORITY);
         // Schedule heartbeat generator.
         timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
 
@@ -571,6 +575,7 @@
             this.nodeControllerService = ncAppEntryPoint;
         }
 
+        @Override
         public void run() {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Shutdown hook in progress");
diff --git 
a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index dd3d2f9..e02e4f4 100644
--- 
a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -64,6 +64,7 @@
     IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) 
throws IOException {
         this.system = system;
         this.networkThread = new NetworkThread();
+        this.networkThread.setPriority(Thread.MAX_PRIORITY);
         this.serverSocketChannel = ServerSocketChannel.open();
         serverSocketChannel.socket().setReuseAddress(true);
         serverSocketChannel.configureBlocking(false);
@@ -114,8 +115,8 @@
                 }
             } else if (attempt < retries) {
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Connection to " + remoteAddress +
-                            " failed (Attempt " + attempt + " of " + retries + 
")");
+                    LOGGER.info("Connection to " + remoteAddress + " failed 
(Attempt " + attempt + " of " + retries
+                            + ")");
                     attempt++;
                     Thread.sleep(5000);
                 }
@@ -308,8 +309,7 @@
                                     if (!channel.finishConnect()) {
                                         throw new Exception("Connection did 
not finish");
                                     }
-                                }
-                                catch (Exception e) {
+                                } catch (Exception e) {
                                     e.printStackTrace();
                                     
handle.setState(HandleState.CONNECT_FAILED);
                                     continue;
diff --git 
a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
 
b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 42e19f7..4b6c22f 100644
--- 
a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ 
b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -106,7 +106,7 @@
         public IOThread() throws IOException {
             super("TCPEndpoint IO Thread");
             setDaemon(true);
-            setPriority(MAX_PRIORITY);
+            setPriority(Thread.NORM_PRIORITY);
             this.pendingConnections = new ArrayList<InetSocketAddress>();
             this.workingPendingConnections = new 
ArrayList<InetSocketAddress>();
             this.incomingConnections = new ArrayList<SocketChannel>();
diff --git 
a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
 
b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 4f9d9ce..facd8c1 100644
--- 
a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ 
b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -475,7 +475,7 @@
         private int cleanedCount = 0;
 
         public CleanerThread() {
-            setPriority(MAX_PRIORITY);
+            setPriority(Thread.NORM_PRIORITY);
             setDaemon(true);
         }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/389
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4e53a85e21a6bdee48a3ca8d004569700f911fbd
Gerrit-PatchSet: 1
Gerrit-Project: hyracks
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to