apex-core git commit: APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied.
Repository: apex-core Updated Branches: refs/heads/release-3.3 232eba368 -> 8b359fc58 APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8b359fc5 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8b359fc5 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8b359fc5 Branch: refs/heads/release-3.3 Commit: 8b359fc58188012f96b1f2b827987fe67c98d74c Parents: 232eba3 Author: Sanjay PujareAuthored: Fri Jan 27 10:35:09 2017 -0800 Committer: Vlad Rozov Committed: Fri Feb 24 17:31:32 2017 -0800 -- .../stram/StreamingAppMasterService.java| 16 1 file changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8b359fc5/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 3b2c4de..087d6d5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -770,6 +770,7 @@ public class StreamingAppMasterService extends CompositeService for (Map.Entry > entry : requestedResources.entrySet()) { if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); +LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); @@ -779,7 +780,7 @@ public class StreamingAppMasterService extends CompositeService } } - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ long currentTime = System.currentTimeMillis(); List blacklistRemovals = new ArrayList(); for (Iterator > it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) { @@ -797,7 +798,7 @@ public class StreamingAppMasterService extends CompositeService } numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}", amResp.getAMCommand()); @@ -836,7 +837,7 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority()); releasedContainers.add(allocatedContainer.getId()); numReleasedContainers++; - numRequestedContainers++; + numRequestedContainers--; continue; } if (csr != null) { @@ -964,7 +965,8 @@ public class StreamingAppMasterService extends CompositeService appDone = true; } - LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" + allocatedContainers.size()); + LOG.debug("Current application state: loop={}, appDone={}, total={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}", +loopCounter, appDone, numTotalContainers, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests); // monitor child containers dnmgr.monitorHeartbeat(); @@ -1038,16 +1040,14 @@ public class StreamingAppMasterService extends CompositeService private AllocateResponse sendContainerAskToRM(List containerRequests, List removedContainerRequests, List releasedContainers) throws YarnException, IOException { if
apex-core git commit: APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.
Repository: apex-core Updated Branches: refs/heads/release-3.4 2f34efd3f -> de967a4b5 APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/de967a4b Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/de967a4b Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/de967a4b Branch: refs/heads/release-3.4 Commit: de967a4b5b57a5876703fd7ef2d5b1bfcfffe3c0 Parents: 2f34efd Author: Sanjay PujareAuthored: Sat Feb 18 12:33:31 2017 -0800 Committer: Vlad Rozov Committed: Fri Feb 24 17:21:49 2017 -0800 -- .../stram/ResourceRequestHandler.java | 1 + .../stram/StreamingAppMasterService.java| 26 +--- 2 files changed, 13 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/de967a4b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java index 6ecc7c5..e760237 100644 --- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java @@ -81,6 +81,7 @@ public class ResourceRequestHandler */ if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); + LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); http://git-wip-us.apache.org/repos/asf/apex-core/blob/de967a4b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 1c7c893..88b64d3 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -699,7 +699,7 @@ public class StreamingAppMasterService extends CompositeService int loopCounter = -1; long nodeReportUpdateTime = 0; List releasedContainers = new ArrayList<>(); -int numTotalContainers = 0; + // keep track of already requested containers to not request them again while waiting for allocation int numRequestedContainers = 0; int numReleasedContainers = 0; @@ -723,7 +723,7 @@ public class StreamingAppMasterService extends CompositeService dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", ar.getApplicationId().toString(), ar.getName(), ar.getUser()); LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); -finishApplication(FinalApplicationStatus.FAILED, numTotalContainers); +finishApplication(FinalApplicationStatus.FAILED); return; } resourceRequestor.updateNodeReports(clientRMService.getNodeReports()); @@ -823,7 +823,7 @@ public class StreamingAppMasterService extends CompositeService resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests); - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ List blacklistRemovals = new ArrayList<>(); for (String hostname : failedBlackListedNodes) { Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime; @@ -838,8 +838,7 @@ public class StreamingAppMasterService extends CompositeService failedBlackListedNodes.removeAll(blacklistRemovals); } - numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}",
apex-core git commit: APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.
Repository: apex-core Updated Branches: refs/heads/release-3.5 66bf590c8 -> bd8f7bade APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/bd8f7bad Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/bd8f7bad Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/bd8f7bad Branch: refs/heads/release-3.5 Commit: bd8f7bade65f03e7c7729da383a29cd424664f91 Parents: 66bf590 Author: Sanjay PujareAuthored: Sat Feb 18 12:33:31 2017 -0800 Committer: Vlad Rozov Committed: Fri Feb 24 17:15:58 2017 -0800 -- .../stram/ResourceRequestHandler.java | 1 + .../stram/StreamingAppMasterService.java| 26 +--- 2 files changed, 13 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java index c56f64f..e7f9672 100644 --- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java @@ -81,6 +81,7 @@ public class ResourceRequestHandler */ if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); + LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 15b6402..3898dbc 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -705,7 +705,7 @@ public class StreamingAppMasterService extends CompositeService int loopCounter = -1; long nodeReportUpdateTime = 0; List releasedContainers = new ArrayList<>(); -int numTotalContainers = 0; + // keep track of already requested containers to not request them again while waiting for allocation int numRequestedContainers = 0; int numReleasedContainers = 0; @@ -729,7 +729,7 @@ public class StreamingAppMasterService extends CompositeService dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", ar.getApplicationId().toString(), ar.getName(), ar.getUser()); LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); -finishApplication(FinalApplicationStatus.FAILED, numTotalContainers); +finishApplication(FinalApplicationStatus.FAILED); return; } resourceRequestor.updateNodeReports(clientRMService.getNodeReports()); @@ -829,7 +829,7 @@ public class StreamingAppMasterService extends CompositeService resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests); - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ List blacklistRemovals = new ArrayList<>(); for (String hostname : failedBlackListedNodes) { Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime; @@ -844,8 +844,7 @@ public class StreamingAppMasterService extends CompositeService failedBlackListedNodes.removeAll(blacklistRemovals); } - numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}",
[2/2] apex-core git commit: Merge branch 'APEXCORE-624.master.sanjay' of http://github.com/sanjaypujare/apex-core into APEXCORE-624
Merge branch 'APEXCORE-624.master.sanjay' of http://github.com/sanjaypujare/apex-core into APEXCORE-624 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a6dd73b9 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a6dd73b9 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a6dd73b9 Branch: refs/heads/master Commit: a6dd73b96b78f5c2509c025fca9fcc96e917f0c1 Parents: 911ccb2 de4c11f Author: Vlad RozovAuthored: Fri Feb 24 17:10:12 2017 -0800 Committer: Vlad Rozov Committed: Fri Feb 24 17:10:12 2017 -0800 -- .../stram/ResourceRequestHandler.java | 1 + .../stram/StreamingAppMasterService.java| 26 +--- 2 files changed, 13 insertions(+), 14 deletions(-) --
[2/3] apex-core git commit: Merge branch 'APEXCORE-636' of github.com:devtagare/incubator-apex-core
Merge branch 'APEXCORE-636' of github.com:devtagare/incubator-apex-core Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a98cc938 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a98cc938 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a98cc938 Branch: refs/heads/master Commit: a98cc938bbfdac9cf0507ea379fdf71282d01814 Parents: 7106b76 6f1e051 Author: Pramod ImmaneniAuthored: Fri Feb 24 07:13:27 2017 -0800 Committer: Pramod Immaneni Committed: Fri Feb 24 07:13:27 2017 -0800 -- .../java/com/datatorrent/stram/client/StramAppLauncher.java | 5 +++-- .../java/com/datatorrent/stram/client/StramClientUtils.java | 4 +++- .../java/com/datatorrent/stram/client/StramAppLauncherTest.java | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) --
[3/3] apex-core git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/apex-core
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/apex-core Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/911ccb26 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/911ccb26 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/911ccb26 Branch: refs/heads/master Commit: 911ccb269c575f759cd91fe154b05cbf8156bc2a Parents: a98cc93 1e3d47b Author: Pramod ImmaneniAuthored: Fri Feb 24 07:16:17 2017 -0800 Committer: Pramod Immaneni Committed: Fri Feb 24 07:16:17 2017 -0800 -- .../stram/codec/DefaultStatefulStreamCodec.java | 22 - .../stram/codec/StatefulStreamCodec.java| 6 +-- .../codec/DefaultStatefulStreamCodecTest.java | 50 3 files changed, 43 insertions(+), 35 deletions(-) --