Yingyi Bu has submitted this change and it was merged. Change subject: Set the priority of the following threads to be Thread.MAX_PRIORITY: 1. heartbeat thread at NC 2. IPC network thread 3. work queue thread in CC ......................................................................
Set the priority of the following threads to be Thread.MAX_PRIORITY: 1. heartbeat thread at NC 2. IPC network thread 3. work queue thread in CC Change-Id: I4e53a85e21a6bdee48a3ca8d004569700f911fbd Reviewed-on: https://asterix-gerrit.ics.uci.edu/389 Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java M hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java M hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java M hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java M hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java 6 files changed, 41 insertions(+), 29 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 9ae7f77..f2f450d 100644 --- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -35,8 +35,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.xml.sax.InputSource; - import org.apache.hyracks.api.application.ICCApplicationEntryPoint; import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; @@ -104,6 +102,7 @@ import org.apache.hyracks.ipc.exceptions.IPCException; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; +import org.xml.sax.InputSource; public class ClusterControllerService extends AbstractRemoteService { private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName()); @@ -172,6 +171,7 @@ runMapArchive = new LinkedHashMap<JobId, JobRun>() { private static final long serialVersionUID = 1L; + @Override protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) { return size() > ccConfig.jobHistorySize; } @@ -181,11 +181,12 @@ /** history size + 1 is for the case when history size = 0 */ private int allowedSize = 100 * (ccConfig.jobHistorySize + 1); + @Override protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) { return size() > allowedSize; } }; - workQueue = new WorkQueue(); + workQueue = new WorkQueue(Thread.MAX_PRIORITY); // WorkQueue is in charge of heartbeat as well as other events. this.timer = new Timer(true); final ClusterTopology topology = computeClusterTopology(ccConfig); ccContext = new ICCContext() { @@ -604,7 +605,7 @@ /** * Add a deployment run - * + * * @param deploymentKey * @param nodeControllerIds */ @@ -614,7 +615,7 @@ /** * Get a deployment run - * + * * @param deploymentKey */ public synchronized DeploymentRun getDeploymentRun(DeploymentId deploymentKey) { @@ -623,7 +624,7 @@ /** * Remove a deployment run - * + * * @param deploymentKey */ public synchronized void removeDeploymentRun(DeploymentId deploymentKey) { diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java index e0d0eb0..da341a7 100644 --- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java +++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java @@ -36,8 +36,14 @@ private boolean stopped; private AtomicInteger enqueueCount; private AtomicInteger dequeueCount; + private int threadPriority = Thread.MAX_PRIORITY; - public WorkQueue() { + public WorkQueue(int threadPriority) { + if (threadPriority != Thread.MAX_PRIORITY && threadPriority != Thread.NORM_PRIORITY + && threadPriority != Thread.MIN_PRIORITY) { + throw new IllegalArgumentException("Illegal thread priority number."); + } + this.threadPriority = threadPriority; queue = new LinkedBlockingQueue<AbstractWork>(); thread = new WorkerThread(); stopSemaphore = new Semaphore(1); @@ -96,7 +102,7 @@ private class WorkerThread extends Thread { WorkerThread() { setDaemon(true); - setPriority(MAX_PRIORITY); + setPriority(threadPriority); } @Override diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index e7689d4..ab0f16b 100644 --- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -26,6 +26,7 @@ import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadMXBean; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Hashtable; @@ -42,7 +43,6 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; - import org.apache.hyracks.api.application.INCApplicationEntryPoint; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.comm.NetworkAddress; @@ -170,11 +170,11 @@ throw new Exception("id not set"); } partitionManager = new PartitionManager(this); - netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, ncConfig.nNetThreads, - ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort); + netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager, + ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort); lccm = new LifeCycleComponentManager(); - queue = new WorkQueue(); + queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new Hashtable<JobId, Joblet>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File( @@ -243,11 +243,11 @@ private void init() throws Exception { ctx.getIOManager().setExecutor(executor); - datasetPartitionManager = new DatasetPartitionManager - (this, executor, ncConfig.resultManagerMemory, ncConfig.resultTTL, ncConfig.resultSweepThreshold); - datasetNetworkManager = new DatasetNetworkManager - (ncConfig.resultIPAddress, ncConfig.resultPort, datasetPartitionManager, - ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress, ncConfig.resultPublicPort); + datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory, + ncConfig.resultTTL, ncConfig.resultSweepThreshold); + datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort, + datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress, + ncConfig.resultPublicPort); } @Override @@ -273,12 +273,11 @@ if (ncConfig.dataPublicIPAddress != null) { netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort); } - ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, - datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean - .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean - .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean - .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(), - runtimeMXBean.getSystemProperties(), hbSchema)); + ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress, + osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(), + runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean + .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), + runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema)); synchronized (this) { while (registrationPending) { @@ -294,6 +293,11 @@ heartbeatTask = new HeartbeatTask(ccs); + // Use reflection to set the priority of the timer thread. + Field threadField = timer.getClass().getDeclaredField("thread"); + threadField.setAccessible(true); + Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object. + timerThread.setPriority(Thread.MAX_PRIORITY); // Schedule heartbeat generator. timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod()); @@ -571,6 +575,7 @@ this.nodeControllerService = ncAppEntryPoint; } + @Override public void run() { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Shutdown hook in progress"); diff --git a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index dd3d2f9..e02e4f4 100644 --- a/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -64,6 +64,7 @@ IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException { this.system = system; this.networkThread = new NetworkThread(); + this.networkThread.setPriority(Thread.MAX_PRIORITY); this.serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.configureBlocking(false); @@ -114,8 +115,8 @@ } } else if (attempt < retries) { if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Connection to " + remoteAddress + - " failed (Attempt " + attempt + " of " + retries + ")"); + LOGGER.info("Connection to " + remoteAddress + " failed (Attempt " + attempt + " of " + retries + + ")"); attempt++; Thread.sleep(5000); } @@ -308,8 +309,7 @@ if (!channel.finishConnect()) { throw new Exception("Connection did not finish"); } - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); handle.setState(HandleState.CONNECT_FAILED); continue; diff --git a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java index 42e19f7..4b6c22f 100644 --- a/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java +++ b/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java @@ -106,7 +106,7 @@ public IOThread() throws IOException { super("TCPEndpoint IO Thread"); setDaemon(true); - setPriority(MAX_PRIORITY); + setPriority(Thread.NORM_PRIORITY); this.pendingConnections = new ArrayList<InetSocketAddress>(); this.workingPendingConnections = new ArrayList<InetSocketAddress>(); this.incomingConnections = new ArrayList<SocketChannel>(); diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 4f9d9ce..facd8c1 100644 --- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -475,7 +475,7 @@ private int cleanedCount = 0; public CleanerThread() { - setPriority(MAX_PRIORITY); + setPriority(Thread.NORM_PRIORITY); setDaemon(true); } -- To view, visit https://asterix-gerrit.ics.uci.edu/389 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I4e53a85e21a6bdee48a3ca8d004569700f911fbd Gerrit-PatchSet: 7 Gerrit-Project: hyracks Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
