Yingyi Bu has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/389
Change subject: Set the priority of the following threads to be Thread.MAX_PRIORITY: 1. heartbeat thread at NC 2. IPC network thread ...................................................................... Set the priority of the following threads to be Thread.MAX_PRIORITY: 1. heartbeat thread at NC 2. IPC network thread Change-Id: I4e53a85e21a6bdee48a3ca8d004569700f911fbd --- A hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/heartbeat/HeartBeatTest.java M hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java M hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java M hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java M hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java 7 files changed, 97 insertions(+), 22 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/hyracks refs/changes/89/389/1 diff --git a/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/heartbeat/HeartBeatTest.java b/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/heartbeat/HeartBeatTest.java new file mode 100644 index 0000000..6749eab --- /dev/null +++ b/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/heartbeat/HeartBeatTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.client.heartbeat; + +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.client.NodeControllerInfo; +import org.apache.hyracks.client.stats.HyracksUtils; +import org.junit.Test; + +public class HeartBeatTest { + + @Test + public void test() throws Exception { + HyracksUtils.init(); + int cores = Runtime.getRuntime().availableProcessors(); + Thread[] cpuConsumers = new Thread[cores * 4]; + for (int i = 0; i < cpuConsumers.length; i++) { + cpuConsumers[i] = new Thread(new Runnable() { + private long counter = 0; + private long round = 0; + + @Override + public void run() { + while (true) { + counter = (counter + 1) % Long.MAX_VALUE - 1; + if (counter == 0) { + round++; + if (round == 8L * Integer.MAX_VALUE) { + return; + } + } + } + } + + }); + cpuConsumers[i].setPriority(Thread.MIN_PRIORITY); + cpuConsumers[i].start(); + } + for (int i = 0; i < cpuConsumers.length; i++) { + cpuConsumers[i].join(); + } + IHyracksClientConnection hcc = new HyracksConnection("localhost", 2099); + Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos(); + Assert.assertEquals(ncMap.size(), 2); + HyracksUtils.deinit(); + } +} diff --git a/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java b/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java index a41ddd9..f3c12dc 100644 --- a/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java +++ b/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java @@ -57,7 +57,8 @@ ccConfig.defaultMaxJobAttempts = 0; ccConfig.jobHistorySize = 0; ccConfig.profileDumpPeriod = -1; - ccConfig.heartbeatPeriod = 50; + ccConfig.heartbeatPeriod = 60; + ccConfig.maxHeartbeatLapsePeriods = 5; // cluster controller cc = new ClusterControllerService(ccConfig); diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java index e0d0eb0..94b8d97 100644 --- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java +++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java @@ -96,7 +96,7 @@ private class WorkerThread extends Thread { WorkerThread() { setDaemon(true); - setPriority(MAX_PRIORITY); + setPriority(Thread.MAX_PRIORITY); } @Override diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index e7689d4..92eb0ae 100644 --- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -26,6 +26,7 @@ import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadMXBean; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Hashtable; @@ -42,7 +43,6 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; - import org.apache.hyracks.api.application.INCApplicationEntryPoint; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; @@ -170,8 +170,8 @@ throw new Exception("id not set"); } partitionManager = new PartitionManager(this); - netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, ncConfig.nNetThreads, - ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort); + netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, + ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort); lccm = new LifeCycleComponentManager(); queue = new WorkQueue(); @@ -243,11 +243,11 @@ private void init() throws Exception { ctx.getIOManager().setExecutor(executor); - datasetPartitionManager = new DatasetPartitionManager - (this, executor, ncConfig.resultManagerMemory, ncConfig.resultTTL, ncConfig.resultSweepThreshold); - datasetNetworkManager = new DatasetNetworkManager - (ncConfig.resultIPAddress, ncConfig.resultPort, datasetPartitionManager, - ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress, ncConfig.resultPublicPort); + datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory, + ncConfig.resultTTL, ncConfig.resultSweepThreshold); + datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort, + datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress, + ncConfig.resultPublicPort); } @Override @@ -273,12 +273,11 @@ if (ncConfig.dataPublicIPAddress != null) { netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort); } - ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, - datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean - .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean - .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean - .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(), - runtimeMXBean.getSystemProperties(), hbSchema)); + ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress, + osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(), + runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean + .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), + runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema)); synchronized (this) { while (registrationPending) { @@ -294,6 +293,11 @@ heartbeatTask = new HeartbeatTask(ccs); + // Use reflection to set the priority of the timer thread. + Field threadField = timer.getClass().getDeclaredField("thread"); + threadField.setAccessible(true); + Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object. + timerThread.setPriority(Thread.MAX_PRIORITY); // Schedule heartbeat generator. timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod()); @@ -571,6 +575,7 @@ this.nodeControllerService = ncAppEntryPoint; } + @Override public void run() { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Shutdown hook in progress"); diff --git a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index dd3d2f9..e02e4f4 100644 --- a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -64,6 +64,7 @@ IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException { this.system = system; this.networkThread = new NetworkThread(); + this.networkThread.setPriority(Thread.MAX_PRIORITY); this.serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.configureBlocking(false); @@ -114,8 +115,8 @@ } } else if (attempt < retries) { if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Connection to " + remoteAddress + - " failed (Attempt " + attempt + " of " + retries + ")"); + LOGGER.info("Connection to " + remoteAddress + " failed (Attempt " + attempt + " of " + retries + + ")"); attempt++; Thread.sleep(5000); } @@ -308,8 +309,7 @@ if (!channel.finishConnect()) { throw new Exception("Connection did not finish"); } - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); handle.setState(HandleState.CONNECT_FAILED); continue; diff --git a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java index 42e19f7..4b6c22f 100644 --- a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java +++ b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java @@ -106,7 +106,7 @@ public IOThread() throws IOException { super("TCPEndpoint IO Thread"); setDaemon(true); - setPriority(MAX_PRIORITY); + setPriority(Thread.NORM_PRIORITY); this.pendingConnections = new ArrayList<InetSocketAddress>(); this.workingPendingConnections = new ArrayList<InetSocketAddress>(); this.incomingConnections = new ArrayList<SocketChannel>(); diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 4f9d9ce..facd8c1 100644 --- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -475,7 +475,7 @@ private int cleanedCount = 0; public CleanerThread() { - setPriority(MAX_PRIORITY); + setPriority(Thread.NORM_PRIORITY); setDaemon(true); } -- To view, visit https://asterix-gerrit.ics.uci.edu/389 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I4e53a85e21a6bdee48a3ca8d004569700f911fbd Gerrit-PatchSet: 1 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]>
