apex-core git commit: APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied.

2017-02-24 Thread vrozov
Repository: apex-core
Updated Branches:
  refs/heads/release-3.3 232eba368 -> 8b359fc58


APEXCORE-624 decrement unallocated containers and also released containers so 
that the exit condition for the shutdown check is satisfied.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8b359fc5
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8b359fc5
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8b359fc5

Branch: refs/heads/release-3.3
Commit: 8b359fc58188012f96b1f2b827987fe67c98d74c
Parents: 232eba3
Author: Sanjay Pujare 
Authored: Fri Jan 27 10:35:09 2017 -0800
Committer: Vlad Rozov 
Committed: Fri Feb 24 17:31:32 2017 -0800

--
 .../stram/StreamingAppMasterService.java| 16 
 1 file changed, 8 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/apex-core/blob/8b359fc5/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
--
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 3b2c4de..087d6d5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -770,6 +770,7 @@ public class StreamingAppMasterService extends 
CompositeService
 for (Map.Entry> entry : requestedResources.entrySet()) {
   if ((loopCounter - entry.getValue().getKey()) > 
NUMBER_MISSED_HEARTBEATS) {
 StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
+LOG.debug("Request for container {} timed out. Re-requesting 
container", csr.container);
 removedContainerRequests.add(entry.getValue().getRight());
 ContainerRequest cr = 
resourceRequestor.createContainerRequest(csr, false);
 entry.getValue().setLeft(loopCounter);
@@ -779,7 +780,7 @@ public class StreamingAppMasterService extends 
CompositeService
 }
   }
 
- /* Remove nodes from blacklist after timeout */
+  /* Remove nodes from blacklist after timeout */
   long currentTime = System.currentTimeMillis();
   List blacklistRemovals = new ArrayList();
   for (Iterator> it = 
blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) {
@@ -797,7 +798,7 @@ public class StreamingAppMasterService extends 
CompositeService
   }
 
   numTotalContainers += containerRequests.size();
-  numRequestedContainers += containerRequests.size();
+  numRequestedContainers += containerRequests.size() - 
removedContainerRequests.size();
   AllocateResponse amResp = sendContainerAskToRM(containerRequests, 
removedContainerRequests, releasedContainers);
   if (amResp.getAMCommand() != null) {
 LOG.info(" statement executed:{}", amResp.getAMCommand());
@@ -836,7 +837,7 @@ public class StreamingAppMasterService extends 
CompositeService
   LOG.info("Releasing {} as resource with priority {} was already 
assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
   releasedContainers.add(allocatedContainer.getId());
   numReleasedContainers++;
-  numRequestedContainers++;
+  numRequestedContainers--;
   continue;
 }
 if (csr != null) {
@@ -964,7 +965,8 @@ public class StreamingAppMasterService extends 
CompositeService
 appDone = true;
   }
 
-  LOG.debug("Current application state: loop=" + loopCounter + ", 
appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + 
numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" 
+ numCompletedContainers + ", failed=" + numFailedContainers + ", 
currentAllocated=" + allocatedContainers.size());
+  LOG.debug("Current application state: loop={}, appDone={}, total={}, 
requested={}, released={}, completed={}, failed={}, currentAllocated={}, 
dnmgr.containerStartRequests={}",
+loopCounter, appDone, numTotalContainers, numRequestedContainers, 
numReleasedContainers, numCompletedContainers, numFailedContainers, 
allocatedContainers.size(), dnmgr.containerStartRequests);
 
   // monitor child containers
   dnmgr.monitorHeartbeat();
@@ -1038,16 +1040,14 @@ public class StreamingAppMasterService extends 
CompositeService
   private AllocateResponse sendContainerAskToRM(List 
containerRequests, List removedContainerRequests, 
List releasedContainers) throws YarnException, IOException
   {
 if 

apex-core git commit: APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.

2017-02-24 Thread vrozov
Repository: apex-core
Updated Branches:
  refs/heads/release-3.4 2f34efd3f -> de967a4b5


APEXCORE-624 decrement unallocated containers and released containers so exit 
condition for shutdown check is satisfied.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/de967a4b
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/de967a4b
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/de967a4b

Branch: refs/heads/release-3.4
Commit: de967a4b5b57a5876703fd7ef2d5b1bfcfffe3c0
Parents: 2f34efd
Author: Sanjay Pujare 
Authored: Sat Feb 18 12:33:31 2017 -0800
Committer: Vlad Rozov 
Committed: Fri Feb 24 17:21:49 2017 -0800

--
 .../stram/ResourceRequestHandler.java   |  1 +
 .../stram/StreamingAppMasterService.java| 26 +---
 2 files changed, 13 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/apex-core/blob/de967a4b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
--
diff --git 
a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java 
b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index 6ecc7c5..e760237 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -81,6 +81,7 @@ public class ResourceRequestHandler
  */
 if ((loopCounter - entry.getValue().getKey()) > 
NUMBER_MISSED_HEARTBEATS) {
   StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
+  LOG.debug("Request for container {} timed out. Re-requesting 
container", csr.container);
   removedContainerRequests.add(entry.getValue().getRight());
   ContainerRequest cr = resourceRequestor.createContainerRequest(csr, 
false);
   entry.getValue().setLeft(loopCounter);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/de967a4b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
--
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 1c7c893..88b64d3 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -699,7 +699,7 @@ public class StreamingAppMasterService extends 
CompositeService
 int loopCounter = -1;
 long nodeReportUpdateTime = 0;
 List releasedContainers = new ArrayList<>();
-int numTotalContainers = 0;
+
 // keep track of already requested containers to not request them again 
while waiting for allocation
 int numRequestedContainers = 0;
 int numReleasedContainers = 0;
@@ -723,7 +723,7 @@ public class StreamingAppMasterService extends 
CompositeService
 dnmgr.shutdownDiagnosticsMessage = String.format("Application master 
failed due to application %s with duplicate application name \"%s\" by the same 
user \"%s\" is already started.",
 ar.getApplicationId().toString(), ar.getName(), ar.getUser());
 LOG.info("Forced shutdown due to {}", 
dnmgr.shutdownDiagnosticsMessage);
-finishApplication(FinalApplicationStatus.FAILED, numTotalContainers);
+finishApplication(FinalApplicationStatus.FAILED);
 return;
   }
   resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
@@ -823,7 +823,7 @@ public class StreamingAppMasterService extends 
CompositeService
 
   resourceRequestor.reissueContainerRequests(amRmClient, 
requestedResources, loopCounter, resourceRequestor, containerRequests, 
removedContainerRequests);
 
- /* Remove nodes from blacklist after timeout */
+  /* Remove nodes from blacklist after timeout */
   List blacklistRemovals = new ArrayList<>();
   for (String hostname : failedBlackListedNodes) {
 Long timeDiff = currentTimeMillis - 
failedContainerNodesMap.get(hostname).blackListAdditionTime;
@@ -838,8 +838,7 @@ public class StreamingAppMasterService extends 
CompositeService
 failedBlackListedNodes.removeAll(blacklistRemovals);
   }
 
-  numTotalContainers += containerRequests.size();
-  numRequestedContainers += containerRequests.size();
+  numRequestedContainers += containerRequests.size() - 
removedContainerRequests.size();
   AllocateResponse amResp = sendContainerAskToRM(containerRequests, 
removedContainerRequests, releasedContainers);
   if (amResp.getAMCommand() != null) {
 LOG.info(" statement executed:{}", 

apex-core git commit: APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.

2017-02-24 Thread vrozov
Repository: apex-core
Updated Branches:
  refs/heads/release-3.5 66bf590c8 -> bd8f7bade


APEXCORE-624 decrement unallocated containers and released containers so exit 
condition for shutdown check is satisfied.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/bd8f7bad
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/bd8f7bad
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/bd8f7bad

Branch: refs/heads/release-3.5
Commit: bd8f7bade65f03e7c7729da383a29cd424664f91
Parents: 66bf590
Author: Sanjay Pujare 
Authored: Sat Feb 18 12:33:31 2017 -0800
Committer: Vlad Rozov 
Committed: Fri Feb 24 17:15:58 2017 -0800

--
 .../stram/ResourceRequestHandler.java   |  1 +
 .../stram/StreamingAppMasterService.java| 26 +---
 2 files changed, 13 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
--
diff --git 
a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java 
b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
index c56f64f..e7f9672 100644
--- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
+++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java
@@ -81,6 +81,7 @@ public class ResourceRequestHandler
  */
 if ((loopCounter - entry.getValue().getKey()) > 
NUMBER_MISSED_HEARTBEATS) {
   StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
+  LOG.debug("Request for container {} timed out. Re-requesting 
container", csr.container);
   removedContainerRequests.add(entry.getValue().getRight());
   ContainerRequest cr = resourceRequestor.createContainerRequest(csr, 
false);
   entry.getValue().setLeft(loopCounter);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
--
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 15b6402..3898dbc 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -705,7 +705,7 @@ public class StreamingAppMasterService extends 
CompositeService
 int loopCounter = -1;
 long nodeReportUpdateTime = 0;
 List releasedContainers = new ArrayList<>();
-int numTotalContainers = 0;
+
 // keep track of already requested containers to not request them again 
while waiting for allocation
 int numRequestedContainers = 0;
 int numReleasedContainers = 0;
@@ -729,7 +729,7 @@ public class StreamingAppMasterService extends 
CompositeService
 dnmgr.shutdownDiagnosticsMessage = String.format("Application master 
failed due to application %s with duplicate application name \"%s\" by the same 
user \"%s\" is already started.",
 ar.getApplicationId().toString(), ar.getName(), ar.getUser());
 LOG.info("Forced shutdown due to {}", 
dnmgr.shutdownDiagnosticsMessage);
-finishApplication(FinalApplicationStatus.FAILED, numTotalContainers);
+finishApplication(FinalApplicationStatus.FAILED);
 return;
   }
   resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
@@ -829,7 +829,7 @@ public class StreamingAppMasterService extends 
CompositeService
 
   resourceRequestor.reissueContainerRequests(amRmClient, 
requestedResources, loopCounter, resourceRequestor, containerRequests, 
removedContainerRequests);
 
- /* Remove nodes from blacklist after timeout */
+  /* Remove nodes from blacklist after timeout */
   List blacklistRemovals = new ArrayList<>();
   for (String hostname : failedBlackListedNodes) {
 Long timeDiff = currentTimeMillis - 
failedContainerNodesMap.get(hostname).blackListAdditionTime;
@@ -844,8 +844,7 @@ public class StreamingAppMasterService extends 
CompositeService
 failedBlackListedNodes.removeAll(blacklistRemovals);
   }
 
-  numTotalContainers += containerRequests.size();
-  numRequestedContainers += containerRequests.size();
+  numRequestedContainers += containerRequests.size() - 
removedContainerRequests.size();
   AllocateResponse amResp = sendContainerAskToRM(containerRequests, 
removedContainerRequests, releasedContainers);
   if (amResp.getAMCommand() != null) {
 LOG.info(" statement executed:{}", 

[2/2] apex-core git commit: Merge branch 'APEXCORE-624.master.sanjay' of http://github.com/sanjaypujare/apex-core into APEXCORE-624

2017-02-24 Thread vrozov
Merge branch 'APEXCORE-624.master.sanjay' of 
http://github.com/sanjaypujare/apex-core into APEXCORE-624


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a6dd73b9
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a6dd73b9
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a6dd73b9

Branch: refs/heads/master
Commit: a6dd73b96b78f5c2509c025fca9fcc96e917f0c1
Parents: 911ccb2 de4c11f
Author: Vlad Rozov 
Authored: Fri Feb 24 17:10:12 2017 -0800
Committer: Vlad Rozov 
Committed: Fri Feb 24 17:10:12 2017 -0800

--
 .../stram/ResourceRequestHandler.java   |  1 +
 .../stram/StreamingAppMasterService.java| 26 +---
 2 files changed, 13 insertions(+), 14 deletions(-)
--




[2/3] apex-core git commit: Merge branch 'APEXCORE-636' of github.com:devtagare/incubator-apex-core

2017-02-24 Thread pramod
Merge branch 'APEXCORE-636' of github.com:devtagare/incubator-apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a98cc938
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a98cc938
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a98cc938

Branch: refs/heads/master
Commit: a98cc938bbfdac9cf0507ea379fdf71282d01814
Parents: 7106b76 6f1e051
Author: Pramod Immaneni 
Authored: Fri Feb 24 07:13:27 2017 -0800
Committer: Pramod Immaneni 
Committed: Fri Feb 24 07:13:27 2017 -0800

--
 .../java/com/datatorrent/stram/client/StramAppLauncher.java | 5 +++--
 .../java/com/datatorrent/stram/client/StramClientUtils.java | 4 +++-
 .../java/com/datatorrent/stram/client/StramAppLauncherTest.java | 2 +-
 3 files changed, 7 insertions(+), 4 deletions(-)
--




[3/3] apex-core git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/apex-core

2017-02-24 Thread pramod
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/911ccb26
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/911ccb26
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/911ccb26

Branch: refs/heads/master
Commit: 911ccb269c575f759cd91fe154b05cbf8156bc2a
Parents: a98cc93 1e3d47b
Author: Pramod Immaneni 
Authored: Fri Feb 24 07:16:17 2017 -0800
Committer: Pramod Immaneni 
Committed: Fri Feb 24 07:16:17 2017 -0800

--
 .../stram/codec/DefaultStatefulStreamCodec.java | 22 -
 .../stram/codec/StatefulStreamCodec.java|  6 +--
 .../codec/DefaultStatefulStreamCodecTest.java   | 50 
 3 files changed, 43 insertions(+), 35 deletions(-)
--