This is an automated email from the ASF dual-hosted git repository. mpapirkovskyy pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 70edbb5 AMBARI-23541. Sometimes host checks never complete. (mpapirkovskyy) 70edbb5 is described below commit 70edbb5cbfcd6c4ff83589650f715ae9c1b92790 Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org> AuthorDate: Wed Apr 11 20:19:13 2018 +0300 AMBARI-23541. Sometimes host checks never complete. (mpapirkovskyy) --- .../ambari/server/agent/AgentReportsProcessor.java | 61 +++++++++++----------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java index 88c2665..ad5c6aa 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java @@ -17,11 +17,11 @@ */ package org.apache.ambari.server.agent; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.configuration.Configuration; @@ -37,12 +37,15 @@ import com.google.inject.persist.UnitOfWork; public class AgentReportsProcessor { private static final Logger LOG = LoggerFactory.getLogger(AgentReportsProcessor.class); - private ScheduledExecutorService executor; + private final int poolSize; - private ConcurrentLinkedQueue<AgentReport> agentReportsQueue = new ConcurrentLinkedQueue<>(); + private final List<ExecutorService> executors; public void addAgentReport(AgentReport agentReport) { - agentReportsQueue.add(agentReport); + int hash = agentReport.getHostName().hashCode(); + hash = hash == Integer.MIN_VALUE ? 0 : hash; + int executorNumber = Math.abs(hash) % poolSize; + executors.get(executorNumber).execute(new AgentReportProcessingTask(agentReport)); } @Inject @@ -55,40 +58,38 @@ public class AgentReportsProcessor { public AgentReportsProcessor(Configuration configuration) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("agent-report-processor-%d").build(); - int poolSize = configuration.getAgentsReportThreadPoolSize(); - executor = Executors.newScheduledThreadPool(poolSize, threadFactory); - for (int i=0; i< poolSize; i++) { - executor.scheduleAtFixedRate(new AgentReportProcessingTask(), - configuration.getAgentsReportProcessingStartTimeout(), - configuration.getAgentsReportProcessingPeriod(), TimeUnit.SECONDS); + poolSize = configuration.getAgentsReportThreadPoolSize(); + executors = new ArrayList<>(); + for (int i = 0; i < poolSize; i++) { + executors.add(Executors.newSingleThreadExecutor(threadFactory)); } } private class AgentReportProcessingTask implements Runnable { + private final AgentReport agentReport; + + public AgentReportProcessingTask(AgentReport agentReport) { + this.agentReport = agentReport; + } + @Override public void run() { try { unitOfWork.begin(); - while (true) { - AgentReport agentReport = agentReportsQueue.poll(); - if (agentReport == null) { - break; - } - String hostName = agentReport.getHostName(); - try { - - //TODO rewrite with polymorphism usage. - if (agentReport.getCommandReports() != null) { - hh.handleCommandReportStatus(agentReport.getCommandReports(), hostName); - } else if (agentReport.getComponentStatuses() != null) { - hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName); - } else if (agentReport.getHostStatusReport() != null) { - hh.handleHostReportStatus(agentReport.getHostStatusReport(), hostName); - } - } catch (AmbariException e) { - LOG.error("Error processing agent reports", e); + String hostName = agentReport.getHostName(); + try { + + //TODO rewrite with polymorphism usage. + if (agentReport.getCommandReports() != null) { + hh.handleCommandReportStatus(agentReport.getCommandReports(), hostName); + } else if (agentReport.getComponentStatuses() != null) { + hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName); + } else if (agentReport.getHostStatusReport() != null) { + hh.handleHostReportStatus(agentReport.getHostStatusReport(), hostName); } + } catch (AmbariException e) { + LOG.error("Error processing agent reports", e); } } finally { unitOfWork.end(); -- To stop receiving notification emails like this one, please contact mpapirkovs...@apache.org.