[apex-core] branch master updated: APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new d17f464 APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure d17f464 is described below commit d17f464fcaf19778e2f8edbe2b03419151558068 Author: Pramod Immaneni AuthorDate: Thu Sep 14 17:23:54 2017 -0700 APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure --- api/src/main/java/com/datatorrent/api/Context.java | 9 ++ api/src/main/java/com/datatorrent/api/DAG.java | 3 ++ .../main/java/com/datatorrent/api/Operator.java| 24 ++- .../api/annotation/OperatorAnnotation.java | 11 +++ .../java/com/datatorrent/stram/engine/Node.java| 13 ++-- .../stram/engine/StreamingContainer.java | 36 +++--- .../stram/plan/logical/LogicalPlan.java| 6 7 files changed, 95 insertions(+), 7 deletions(-) diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 9fe0c46..0da930d 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Operator.ProcessingMode; +import com.datatorrent.api.Operator.RecoveryMode; import com.datatorrent.api.StringCodec.Class2String; import com.datatorrent.api.StringCodec.Collection2String; import com.datatorrent.api.StringCodec.Integer2String; @@ -317,6 +318,14 @@ public interface Context Attribute METRICS_DIMENSIONS_SCHEME = new Attribute<>(Object2String.getInstance()); /** + * Specify how to recover the operator in cases of a failure event. The default is to load from checkpoint. However, + * in some cases reusing same instance of the operator from before the failure event may be desired. See + * {@link RecoveryMode} The latter is only applicable in cases where the recovery is due to a failure of the + * upstream operator and not the operator itself. + */ +Attribute RECOVERY_MODE = new Attribute(RecoveryMode.CHECKPOINT); + +/** * Return the operator runtime id. * * @return The id diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 93936d7..471950b 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -27,6 +27,7 @@ import javax.annotation.Nonnull; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.annotation.OperatorAnnotation; /** * DAG contains the logical declarations of operators and streams. @@ -190,6 +191,8 @@ public interface DAG extends DAGContext, Serializable OutputPortMeta getMeta(Operator.OutputPort port); +OperatorAnnotation getOperatorAnnotation(); + /** * Return collection of stream which are connected to this operator's * input ports. diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java index dd694d0..f0357e5 100644 --- a/api/src/main/java/com/datatorrent/api/Operator.java +++ b/api/src/main/java/com/datatorrent/api/Operator.java @@ -18,6 +18,8 @@ */ package com.datatorrent.api; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG.GenericOperator; @@ -71,6 +73,25 @@ public interface Operator extends Component, GenericOperator } + @Evolving + enum RecoveryMode + { +/** + * Recover the operator from checkpoint + */ +CHECKPOINT, +/** + * Reuse the same instance of the operator from before the failure event. + * + * This applies to scenarios where the failure is in an upstream operator and the not the operator itself. + * Reusing the same instance may not be applicable in all cases as it can lead to incorrect results because the + * operator state will not be consistent with the processing position in the stream. This should be used only for + * operators that are either invariant to reusing the same state with the stream processing position modified + * according to the processing mode or tolerant to it. + */ +REUSE_INSTANCE + } + /** * This method gets called at the beginning of each window. * @@ -227,7 +248,8 @@ public int
[apex-core] branch master updated: APEXCORE-817 Specifying full path for the java command line programs
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 819026e APEXCORE-817 Specifying full path for the java command line programs 819026e is described below commit 819026eca63806b774fee85f7d2ebd9784a015d9 Author: Pramod Immaneni AuthorDate: Mon Jun 18 13:38:02 2018 -0700 APEXCORE-817 Specifying full path for the java command line programs --- .../com/datatorrent/stram/StramLocalClusterTest.java | 16 +--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java index 56641f8..fd475c8 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java @@ -60,7 +60,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.support.ManualScheduledExecutorService; import com.datatorrent.stram.support.StramTestSupport; - +import com.datatorrent.stram.util.VersionInfo; public class StramLocalClusterTest { @@ -381,14 +381,18 @@ public class StramLocalClusterTest String sourceDir = "src/test/resources/dynamicJar/"; String destDir = testMeta.getPath(); +// The compiled java class should be loadable by the current java runtime hence setting the compile target version +// to be the same +String binLocation = getJavaBinLocation(); + Process p = Runtime.getRuntime() -.exec(new String[] {"javac", "-d", destDir, sourceDir + pojoClassName + ".java"}, null, null); +.exec(new String[] {binLocation + "javac", "-d", destDir, sourceDir + pojoClassName + ".java"}, null, null); IOUtils.copy(p.getInputStream(), System.out); IOUtils.copy(p.getErrorStream(), System.err); Assert.assertEquals(0, p.waitFor()); p = Runtime.getRuntime() -.exec(new String[] {"jar", "-cf", pojoClassName + ".jar", pojoClassName + ".class"}, null, new File(destDir)); +.exec(new String[] {binLocation + "jar", "-cf", pojoClassName + ".jar", pojoClassName + ".class"}, null, new File(destDir)); IOUtils.copy(p.getInputStream(), System.out); IOUtils.copy(p.getErrorStream(), System.err); Assert.assertEquals(0, p.waitFor()); @@ -396,6 +400,12 @@ public class StramLocalClusterTest return new File(destDir, pojoClassName + ".jar").getAbsolutePath(); } + private String getJavaBinLocation() + { +String javaHome = System.getProperty("java.home"); +return VersionInfo.compare(System.getProperty("java.version"), "1.9") < 0 ? javaHome + "/../bin/" : javaHome + "/bin/"; + } + @Test public void testAppPath() throws Exception {
[apex-core] branch master updated: APEXCORE-810 Fixing race condition between publisher and subscriber teardowns
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 436785b APEXCORE-810 Fixing race condition between publisher and subscriber teardowns 436785b is described below commit 436785bd63be0e90265cf8f8f18882647b8ecab0 Author: Pramod Immaneni AuthorDate: Wed Jan 17 13:48:32 2018 -0800 APEXCORE-810 Fixing race condition between publisher and subscriber teardowns --- .../bufferserver/internal/LogicalNode.java | 7 +-- .../datatorrent/bufferserver/server/Server.java| 62 +- 2 files changed, 27 insertions(+), 42 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 3e8846d..af5db09 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -115,12 +115,7 @@ public class LogicalNode implements DataListener */ public void removeChannel(WriteOnlyClient client) { -for (PhysicalNode pn : physicalNodes) { - if (pn.getClient() == client) { -physicalNodes.remove(pn); -break; - } -} +physicalNodes.removeIf(node -> (node.getClient().equals(client))); } /** diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index c5700f2..6332a18 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -24,9 +24,6 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -117,9 +114,11 @@ public class Server extends AbstractServer @Override public void unregistered(SelectionKey key) { +logger.debug("Unregistered {}", this); for (LogicalNode ln : subscriberGroups.values()) { ln.boot(); } +super.unregistered(key); /* * There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor. */ @@ -860,41 +859,32 @@ public class Server extends AbstractServer } torndown = true; - /* - * if the publisher unregistered, all the downstream guys are going to be unregistered anyways - * in our world. So it makes sense to kick them out proactively. Otherwise these clients since - * are not being written to, just stick around till the next publisher shows up and eat into - * the data it's publishing for the new subscribers. - */ - - /** - * since the publisher server died, the queue which it was using would stop pumping the data unless - * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node - * with the same identifier as the one which just died. - */ - if (publisherChannels.containsValue(this)) { -final Iterator> i = publisherChannels.entrySet().iterator(); -while (i.hasNext()) { - if (i.next().getValue() == this) { -i.remove(); -break; - } -} - } - - ArrayList list = new ArrayList<>(); - String publisherIdentifier = datalist.getIdentifier(); - Iterator iterator = subscriberGroups.values().iterator(); - while (iterator.hasNext()) { -LogicalNode ln = iterator.next(); -if (publisherIdentifier.equals(ln.getUpstream())) { - list.add(ln); + serverHelperExecutor.submit(() -> + { +/* + * if the publisher unregistered, all the downstream guys are going to be unregistered anyways + * in our world. So it makes sense to kick them out proactively. Otherwise these clients since + * are not being written to, just stick around till the next publisher shows up and eat into + * the data it's publishing for the new subscribers. + */ + +/** + * since the publisher server died, the queue which it was using would stop pumping the data unless + * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node + * with the same identifier as the one which just died. + */ +String publisherIdentifier = datalist.getIdentifier(); +if (
svn commit: r26359 - in /dev/apex/apache-apex-core-3.7.0-RC1: apache-apex-core-3.7.0-source-release.tar.gz.md5 apache-apex-core-3.7.0-source-release.zip.md5
Author: vrozov Date: Mon Apr 16 15:29:13 2018 New Revision: 26359 Log: removed .md5 Removed: dev/apex/apache-apex-core-3.7.0-RC1/apache-apex-core-3.7.0-source-release.tar.gz.md5 dev/apex/apache-apex-core-3.7.0-RC1/apache-apex-core-3.7.0-source-release.zip.md5
[apex-core] branch master updated: APEXCORE-806 Upgrade org.owasp:dependency-check-maven
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new eb9e904 APEXCORE-806 Upgrade org.owasp:dependency-check-maven eb9e904 is described below commit eb9e904368c1d6170c0a0011ace2b2c9f198916d Author: Vlad Rozov <vro...@apache.org> AuthorDate: Mon Apr 2 17:56:01 2018 -0700 APEXCORE-806 Upgrade org.owasp:dependency-check-maven --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index a5ddc4b..acd3c9d 100644 --- a/pom.xml +++ b/pom.xml @@ -467,7 +467,7 @@ org.owasp dependency-check-maven - 3.0.2 + 3.1.2 ${dependency.check.failBuildOnCVSS} ${dependency.check.showSummary} -- To stop receiving notification emails like this one, please contact vro...@apache.org.
[apex-core] branch master updated: APEXCORE-803 Replace outdated archetype version number.
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 4fb580f APEXCORE-803 Replace outdated archetype version number. 4fb580f is described below commit 4fb580fdfb4abf4c29f85a2662a2982b64ee6ae8 Author: Thomas Weise <t...@apache.org> AuthorDate: Wed Jan 17 22:47:47 2018 -0800 APEXCORE-803 Replace outdated archetype version number. --- docs/apex_development_setup.md | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/docs/apex_development_setup.md b/docs/apex_development_setup.md index 7e9b257..f520ba7 100644 --- a/docs/apex_development_setup.md +++ b/docs/apex_development_setup.md @@ -62,7 +62,7 @@ Confirm by running the following commands and comparing with output that show in Creating New Apex Project --- -After development tools are configured, you can now use the maven archetype to create a basic Apache Apex project. **Note:** When executing the commands below, replace `3.4.0` by [latest available version](http://apex.apache.org/downloads.html) of Apache Apex. +After development tools are configured, you can now use the maven archetype to create a basic Apache Apex project. **Note:** When executing the commands below, you can optionally replace `RELEASE` with a [specific version](http://apex.apache.org/downloads.html) of Apache Apex. * **Windows** - Create a new Windows command file called `newapp.cmd` by copying the lines below, and execute it. When you run this file, the properties will be displayed and you will be prompted with `` Y: :``; just press **Enter** to complete the project generation. The caret (^) at the end of some lines indicates that a continuation line follows. @@ -72,7 +72,7 @@ After development tools are configured, you can now use the maven archetype to c setlocal mvn archetype:generate ^ -DarchetypeGroupId=org.apache.apex ^ - -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 ^ + -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=RELEASE ^ -DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp ^ -Dversion=1.0-SNAPSHOT endlocal @@ -82,7 +82,7 @@ After development tools are configured, you can now use the maven archetype to c mvn archetype:generate \ -DarchetypeGroupId=org.apache.apex \ - -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 \ + -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=RELEASE \ -DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp \ -Dversion=1.0-SNAPSHOT @@ -159,6 +159,3 @@ Sandbox --- To jump-start development with Apex, please refer to the [Downloads](https://apex.apache.org/downloads.html) section of the Apache Apex website, which provides a list of 3rd party Apex binary packages and sandbox environments. - - - -- To stop receiving notification emails like this one, please contact vro...@apache.org.
[apex-core] branch master updated: APEXCORE-802 bump Malhar version to 3.8.0
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 1a826aa APEXCORE-802 bump Malhar version to 3.8.0 1a826aa is described below commit 1a826aaf8fea1e971fc97fa65516f35a0c0faf0e Author: Thomas Weise <t...@apache.org> AuthorDate: Wed Jan 17 07:35:24 2018 -0800 APEXCORE-802 bump Malhar version to 3.8.0 --- .../src/main/resources/META-INF/maven/archetype-metadata.xml| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml b/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml index 46439df..4aca435 100644 --- a/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml +++ b/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -28,7 +28,7 @@ ${project.version} - 3.7.0 + 3.8.0 -- To stop receiving notification emails like this one, please contact vro...@apache.org.
[apex-core] 02/02: APEXCORE-800 Disable the disk health checker service for StramMiniClusterTest
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git commit 2e4a7728a9ba5ac98fa93c7460bb48bd5d1d0b98 Author: Vlad Rozov <vro...@apache.org> AuthorDate: Fri Dec 15 19:10:44 2017 -0800 APEXCORE-800 Disable the disk health checker service for StramMiniClusterTest --- engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java index bbb7c1c..fb02593 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java @@ -129,6 +129,7 @@ public class StramMiniClusterTest conf.set("yarn.scheduler.capacity.root.queues", "default"); conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false); +conf.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100.0F); conf.set(YarnConfiguration.NM_ADMIN_USER_ENV, String.format("JAVA_HOME=%s,CLASSPATH=%s", System.getProperty("java.home"), getTestRuntimeClasspath())); conf.set(YarnConfiguration.NM_ENV_WHITELIST, YarnConfiguration.DEFAULT_NM_ENV_WHITELIST.replaceAll("JAVA_HOME,*", "")); -- To stop receiving notification emails like this one, please contact "commits@apex.apache.org" <commits@apex.apache.org>.
[apex-core] branch master updated (646674b -> 2e4a772)
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git. from 646674b APEXCORE-800 Disable the disk health checker service for StramMiniClusterTest new d40033a APEXCORE-798 Exclude log4j.properties from engine-test.jar new 2e4a772 APEXCORE-800 Disable the disk health checker service for StramMiniClusterTest The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: engine/pom.xml | 2 +- engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].
[apex-core] 01/02: APEXCORE-798 Exclude log4j.properties from engine-test.jar
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git commit d40033a9bf160cd484ad696b8c7d9d3c7fa96acb Author: Vlad Rozov <vro...@apache.org> AuthorDate: Fri Dec 15 09:24:06 2017 -0800 APEXCORE-798 Exclude log4j.properties from engine-test.jar --- engine/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/pom.xml b/engine/pom.xml index 4294c8a..93e7d97 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -49,7 +49,7 @@ - + **/yarn-site.xml -- To stop receiving notification emails like this one, please contact "commits@apex.apache.org" <commits@apex.apache.org>.
svn commit: r22710 - /release/apex/KEYS
Author: vrozov Date: Sat Oct 28 02:00:11 2017 New Revision: 22710 Log: Adding key for Ananth Gundabattula <anan...@apache.org> Modified: release/apex/KEYS Modified: release/apex/KEYS == --- release/apex/KEYS (original) +++ release/apex/KEYS Sat Oct 28 02:00:11 2017 @@ -435,3 +435,62 @@ dp+EYxpEhneKfQYX3F64rlJcGi7HqJl8VChxSG9Y twETNTJFVsvuAcpNAr4tcGku1jqs =lX2d -END PGP PUBLIC KEY BLOCK- + +pub rsa4096 2017-10-25 [SC] + 14E1 CC6C 8F2C 5DC6 4CB9 AE89 C421 3087 9F19 1691 +uid [ unknown] Ananth Gundabattula <anan...@apache.org> +sub rsa4096 2017-10-25 [E] + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQINBFnw4nsBEADD74YJqq51mKnIxkMwt5YBgHigc8xfOypeW8ZoBXhJZGgY3Ll9 +JT18tlig1X1lVyWuLe9UQFg+mEKipceokcnnkpTugZelaUBS0IbXtDfkqAohvZ0I +PJfMCIi0NLWcN8b1DpbUKgaFYIsNegAXNwboST59WECu9QQXtHQ1Qb1QnIvhH/P0 +TnbiZLJhKU73eCDSSEIihvcS7Ucr6XAtdMhuY+x5UmejlzcadM6o2ws2xoBH8kxu +XRNR8Nsu5BgrJIFPgPaHSc9X8TjN7VsIUEIxcc7Vtsp6/oZfibBhOzT2N2EB7rVh +hD29xDko/C63/v4HbXn0tKpOzWkeQD39+5aZoiPT35H0C5sZN2PJqlEiraigsJPq +hSmBEM812xtK+Da3dWKXU+jbcRaqPEoRHdTJTK3t7YJ4HxW+nwvBoeV81PkKiR9t +a5MOYr429BT/ZA9J9wJtQtpnEU0fv/za1JpkDdb4oNtNS7xX/n4LMmNn/MIQbgzW +zpxiW38l0HsMOiM1AD+7Es7ecZztIQSq4XEzKLG5kPlZU1iMa0evd8r/qgUvIR/J +hoDpC1gw1yqE/OEy0sCpASPr+rIUG0XG4/3qAZBiujzVb6DVQ0t4tK3+j7/BWZ2w +l2S138UxQHBL4oKj5CqUT91y5E8V+4YRa/48kT7ahkFTjEkhfzdusBWruwARAQAB +tChBbmFudGggR3VuZGFiYXR0dWxhIDxhbmFudGhnQGFwYWNoZS5vcmc+iQJOBBMB +CAA4FiEEFOHMbI8sXcZMua6JxCEwh58ZFpEFAlnw4nsCGwMFCwkIBwIGFQgJCgsC +BBYCAwECHgECF4AACgkQxCEwh58ZFpEH7BAAqQ0qb/lc/KnaphSckXrxl720ZS0I +ogKlCjgnKCeGwlfjdxT6hRZp467xJGu3mOLc7VDsgkMjvoYCf+Fng8BhQCCRR/9p +B29GmkT78BfC/FRQlbUxG/HPemQiH5HuUA6h5LV7MQIcr1XVeF9+SP2u3Ix9f+4r +GIEOFP7SSopIRrYW2Abzsh62wPI9DpS+uySurZt4CNnO1UWJFviVS5RjMSYS3mcw +ioGFclvXKI5qbG3170IJGxwWiy2Nv+9ElqofsEMb45sZ9P9k0FmYjW4Bv/NRQ4zS +KahEzmhQhvm0GhlvNrVljwQW1WI/h/W2vxDbVJRhOyvYrp/BLkKVh3X0z77fpXPb +lPBQGTLlxl6MMbrto08qGIDEMIs/zWgnStr293pA4KNF0umJmtr+9uUGE2DmUvBY +L32Io9wGAfXbqjGyT5ufd9cFhA1E30Wz400YnuJYa8ZOdXFEObVKeM5PlQgsg9ZQ +UFClMG3aGLebA1Zv3qJVV+/Mfb28E1PmM1iaVcS9AxD9YPKCSEP8IkECF1V0l1XL +NvS32olw1VZYbnqsijY12CIWXyOs5V8trVeeX1qJ71qx+b9Sq9d6BeU37kKEQkOb +zzBsi0rNoHFXnqQwz/uX5hhZuzrdwOAslKJTCmoBuarorJZqP7gN/+0hpIO/Scdl +hSr496/GQn4NKta5Ag0EWfDiewEQAJUEOVjlze9I8zlYmynleXh5MLbzHL6EnOvS +WoiO+Z5ORonO/ptr3tXl/RWOIfkDdseVbH2JfNf6R4euD0dQIdJTVLGSImqQlXw/ +woMz5v3VbqE2gGWUZeblT+BOtlGr+BeJoigKTNvxHg1ZqgI0EuRqgQMhgg7EXcke +rPYjpaAgGRL8OmU/lF7VvkR133z7AGSnpR6v7voE0nJoMCbw304R335GfcfmMp7J +hLAhLluZCbi1mop1D5BkQAHXZWLx2bTwjQv0KEjT/v/8wCNNqc/tAOZd64j/aqAV +thQwpw6GuhoK/r0No8gOjUZrFStSzX27G+H0D8vLNdo3XwSPeXXrTX79LCwI+5Is +MYALNj6U3WPgpN1ol05HeY1Eua+4OBcHsquWRpJEt81vPKJLLZ9tMN7Rtb2kRMWO +Tgp7PigLEcJgcXhcFpbZXWCBfeHcvpkuHuLXvrVR+vIKbcIwlJcuGPgUmc6fqjBA +DW0uhkl8rIrVPhVU3RjQ6Xqf/gOk1QMzTCjrx9BmuItBu9Z1CSjAqMxi/KyUlQre +QUNPiT1DG7OjSiGB+6dPvQtQMZN6Wznh0iRB1S8STup+OKsq2ysln8fayLsGHnhD +lus5rrsKCQ+JYRRWCVJJe9X5EDuqDCdJJrW0/Y13TJa90T0srtxZyqXwJ1FnSVak +Cyc5jshNABEBAAGJAjYEGAEIACAWIQQU4cxsjyxdxky5ronEITCHnxkWkQUCWfDi +ewIbDAAKCRDEITCHnxkWkdqND/0XaSn8yMWGxbIHga5PkDI8GOVL4P/Cs8r5vZwK +POFi/rT7riKQv80Njus7Rk44OBbxcmkaUj9YRyyDsSPRqsJMK9IsgGYZdOFW+IdL +fnOzf8QlbtiSPDcUuLjoiablGY2jiTndbSejyEO3BN3PE1Y6ouik7cuJgWXydMmK +sXZammbKLeETeC+WrIedJ2kujNpeY6N/OZiBB7b+E/y8KSBqFDQQW8lm2l42l5lm +7HYv5DPMA0rrIsEp/WcCWgBFn2eLQSosEPqrXh9VHvoO+3PUW1Q4Iu7lxum2yCJE +2TSJ7AF5K3tmvsQ1Nr5jmVJ7UytOx28d9h9E5R29bQdM8feYuYtL+UsjgFBnxICj +CTi37/JqOlNk+8AMmMa5sWuSVlyDKumQsQ42sqF6D20IhsIEt6YeWdfhPtqxwPd4 +/68QPmxtw3Gs+DjyiCqRDIrpnPkiCFdMdiLmUNLz72eGgpp94UEMRGqayF+ZHYpl +QH6/0TtTNPzuTIe0KrcR5wHT38nuwoDVmHnyYgViYceirniaFHazzTNQmEeRdQPR +FUcQqYFFO5ipxHLRslooygn2sq15Po42WpxPPa3RRa47Xt3sevFG1c9QlfNT1NGz +fDciDL0V5Xu1m1KImSkwSSQ4lgnqoeeL/XyAOdsiLaw4vkXKYXTy/GsyDKbf+YPh +/7C58A== +=m1H5 +-END PGP PUBLIC KEY BLOCK- +
[apex-malhar] branch master updated: APEXMALHAR-2435 specify KeyedWindowedOperatorBenchmarkApp application name
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git The following commit(s) were added to refs/heads/master by this push: new be57b9f APEXMALHAR-2435 specify KeyedWindowedOperatorBenchmarkApp application name be57b9f is described below commit be57b9f63e9b8efae986e05bce2862de30fe59b6 Author: brightchen <bri...@datatorrent.com> AuthorDate: Sat Oct 7 06:04:57 2017 +1100 APEXMALHAR-2435 specify KeyedWindowedOperatorBenchmarkApp application name --- .../datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java index fccc67a..7e38584 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java @@ -38,12 +38,14 @@ import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.fileaccess.TFileImpl; import com.datatorrent.lib.util.KeyValPair; /** * @since 3.7.0 */ +@ApplicationAnnotation(name = "KeyedWindowedOperatorBenchmark") public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp< KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator> { -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].
[apex-core] branch master updated: APEXCORE-722 Made data members for Default Port classes as private with protected access methods
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new e2a0a69 APEXCORE-722 Made data members for Default Port classes as private with protected access methods e2a0a69 is described below commit e2a0a69fb1a627bb7c45fc09d1d59243324083cc Author: bhupeshchawda <bhup...@apache.org> AuthorDate: Mon May 15 11:46:15 2017 +0530 APEXCORE-722 Made data members for Default Port classes as private with protected access methods --- api/src/main/java/com/datatorrent/api/DefaultInputPort.java | 11 +-- api/src/main/java/com/datatorrent/api/DefaultOutputPort.java | 2 +- .../org/apache/apex/api/ControlAwareDefaultInputPort.java | 2 +- .../org/apache/apex/api/ControlAwareDefaultOutputPort.java| 8 pom.xml | 3 +++ 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java index dc8705c..6957105 100644 --- a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java +++ b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java @@ -18,6 +18,8 @@ */ package com.datatorrent.api; +import org.apache.hadoop.classification.InterfaceStability; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.Operator.InputPort; @@ -31,8 +33,8 @@ import com.datatorrent.api.Operator.InputPort; */ public abstract class DefaultInputPort implements InputPort, Sink { - protected int count; - protected boolean connected = false; + private int count; + private boolean connected = false; /** * Constructor for DefaultInputPort. @@ -109,4 +111,9 @@ public abstract class DefaultInputPort implements InputPort, Sink */ public abstract void process(T tuple); + @InterfaceStability.Evolving + protected int incrementCount() + { +return ++count; + } } diff --git a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java index acd562f..7892fdb 100644 --- a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java +++ b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java @@ -37,7 +37,7 @@ public class DefaultOutputPort implements Operator.OutputPort public static final String THREAD_AFFINITY_DISABLE_CHECK = "com.datatorrent.api.DefaultOutputPort.thread.check.disable"; private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPort.class); - protected transient Sink sink; + private transient Sink sink; private transient Thread operatorThread; /** diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java index f17d540..5f965bc 100644 --- a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java +++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java @@ -36,7 +36,7 @@ public abstract class ControlAwareDefaultInputPort extends DefaultInputPort extends DefaultOutputPort { public ControlAwareDefaultOutputPort() { -sink = ControlTupleEnabledSink.BLACKHOLE; +setSink(ControlTupleEnabledSink.BLACKHOLE); } /** @@ -46,18 +46,18 @@ public class ControlAwareDefaultOutputPort extends DefaultOutputPort public void emitControl(ControlTuple tuple) { verifyOperatorThread(); -((ControlTupleEnabledSink)sink).putControl(tuple); +((ControlTupleEnabledSink)getSink()).putControl(tuple); } public boolean isConnected() { -return sink != ControlTupleEnabledSink.BLACKHOLE; +return getSink() != ControlTupleEnabledSink.BLACKHOLE; } @Override public void setSink(Sink s) { -this.sink = (s == null ? ControlTupleEnabledSink.BLACKHOLE : s); +super.setSink(s == null ? ControlTupleEnabledSink.BLACKHOLE : s); } } diff --git a/pom.xml b/pom.xml index 5774bb7..ff91946 100644 --- a/pom.xml +++ b/pom.xml @@ -440,6 +440,9 @@ @org.apache.hadoop.classification.InterfaceStability$Evolving @org.apache.hadoop.classification.InterfaceStability$Unstable + +com.datatorrent.api.DefaultInputPort +com.datatorrent.api.DefaultOutputPort -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].
[apex-core] branch master updated: APEXCORE-778 Refactor DelayOperatorTest
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 59c1a44 APEXCORE-778 Refactor DelayOperatorTest 59c1a44 is described below commit 59c1a44ca6089aed16a85a50204e8317af32dd55 Author: Vlad Rozov <vro...@apache.org> AuthorDate: Tue Aug 22 10:16:14 2017 -0700 APEXCORE-778 Refactor DelayOperatorTest --- .../com/datatorrent/stram/StramLocalCluster.java | 12 + .../stram/plan/logical/DelayOperatorTest.java | 277 - 2 files changed, 175 insertions(+), 114 deletions(-) diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 41e358e..eab908c 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -453,6 +453,17 @@ public class StramLocalCluster implements Runnable, Controller this.exitCondition = exitCondition; } + public void run(Callable exitCondition) + { +run(exitCondition, 0); + } + + public void run(Callable exitCondition, long runMillis) + { +setExitCondition(exitCondition); +run(runMillis); + } + @Override public void run() { @@ -519,6 +530,7 @@ public class StramLocalCluster implements Runnable, Controller try { if (exitCondition != null && exitCondition.call()) { +LOG.info("Stopping on exit condition"); appDone = true; } } catch (Exception ex) { diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java index 285aba3..bf4e292 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -18,14 +18,13 @@ */ package com.datatorrent.stram.plan.logical; +import java.math.BigInteger; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; -import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -40,6 +39,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -50,6 +50,7 @@ import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; +import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.DefaultDelayOperator; import com.datatorrent.stram.StramLocalCluster; @@ -65,7 +66,11 @@ import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent; import com.datatorrent.stram.support.StramTestSupport.TestMeta; +import static com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.assertFibonacci; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -76,7 +81,7 @@ public class DelayOperatorTest @Rule public TestMeta testMeta = new TestMeta(); - private static Lock sequential = new ReentrantLock(); + private static final Lock sequential = new ReentrantLock(); @Before public void setup() @@ -98,7 +103,7 @@ public class DelayOperatorTest GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class); GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class); GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class); -DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); +DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); dag.addStream("BtoC", opB.outport1, opC.inport1); dag.addStream("CtoD", opC.outport1, opD.inport1); @@ -177,136 +182,209 @@ public class DelayOperatorTest dag.validate(); } - public static final Long[] FIBONACCI_NUMBERS = new Long[]{ - 1L, 1L, 2L, 3L, 5L, 8L, 13L, 21L, 34L, 55L, 89L, 144L, 233L, 377L, 610L, 987L, 1597L, 2584L, 4181L, 6765L, - 10946L, 17711L, 28657L, 46368L, 75025L, 121393L, 196418L, 317811L, 514229L, 832040L, 1346269L, 2178309L, - 3524578L, 5702887L, 9
[apex-malhar] branch master updated: APEXMALHAR-2489 Change algorithm for running average
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git The following commit(s) were added to refs/heads/master by this push: new 8321395 APEXMALHAR-2489 Change algorithm for running average 8321395 is described below commit 83213956ad7e208b2487868608c5223ecc840a21 Author: Florian Schmidt <florian.schmidt.1...@icloud.com> AuthorDate: Fri Jul 28 10:46:27 2017 -0700 APEXMALHAR-2489 Change algorithm for running average The current algorithm for calculating the running average was subject to a potential overflow, because part of the formula required the average value (average) to be multiplied with the number of processed tuples (count). average * count would for example overflow when e.g. average > Double.MAX_VALUE and count >=2 This commit changes the formula used to the one described on http://www.heikohoffmann.de/htmlthesis/node134.html, where such a multiplication is not necessary anymore. It also adds a unit test which checks that there is not overflow occuring anymore --- .../main/java/com/datatorrent/lib/math/RunningAverage.java| 7 --- .../java/com/datatorrent/lib/math/RunningAverageTest.java | 11 ++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java index e3b0edf..63389ea 100644 --- a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java +++ b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java @@ -64,9 +64,10 @@ public class RunningAverage extends BaseOperator @Override public void process(Number tuple) { - double sum = (RunningAverage.this.count * average) + tuple.doubleValue(); - RunningAverage.this.count++; - average = sum / RunningAverage.this.count; + // Floating mean as explained on http://www.heikohoffmann.de/htmlthesis/node134.html + double firstPart = (1.0 / (++RunningAverage.this.count)); + double secondPart = (tuple.doubleValue() - average); + average += (firstPart * secondPart); } }; diff --git a/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java b/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java index 52da631..3c5e026 100644 --- a/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java +++ b/library/src/test/java/com/datatorrent/lib/math/RunningAverageTest.java @@ -29,8 +29,17 @@ import static org.junit.Assert.assertEquals; */ public class RunningAverageTest { - public RunningAverageTest() + + @Test + public void testDoesNotOverflow() { +RunningAverage instance = new RunningAverage(); +instance.input.process(Double.MAX_VALUE); +assertEquals("first average", Double.MAX_VALUE, instance.average, 0); + +instance.input.process(Double.MAX_VALUE); + +assertEquals("second average", Double.MAX_VALUE, instance.average, 0); } @Test -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].
[apex-core] branch master updated: APEXCORE-737 Increment numRequestedContainers instead of decrementing for an already allocated container
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 70fa21c APEXCORE-737 Increment numRequestedContainers instead of decrementing for an already allocated container 70fa21c is described below commit 70fa21c9a66d50b1b4d10a5845f7f67aebbb7bd6 Author: Sanjay Pujare <sanjaypuj...@users.noreply.github.com> AuthorDate: Thu Jun 1 21:57:24 2017 -0700 APEXCORE-737 Increment numRequestedContainers instead of decrementing for an already allocated container --- .../src/main/java/com/datatorrent/stram/StreamingAppMasterService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index fb39d6c..61a31bd 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -907,7 +907,7 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority()); releasedContainers.add(allocatedContainer.getId()); numReleasedContainers++; -numRequestedContainers--; +numRequestedContainers++;// undo the decrement above for this allocated container continue; } if (csr != null) { -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].
[apex-core] branch master updated: APEXCORE-742 Using a common utility method for creaton of yarn client instances and fixing scenarios where the client is not being initialized correctly.
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 0413f9b APEXCORE-742 Using a common utility method for creaton of yarn client instances and fixing scenarios where the client is not being initialized correctly. 0413f9b is described below commit 0413f9b5bd19a2e5a46c565a7815f1e5c2f7a57f Author: Pramod Immaneni <pra...@datatorrent.com> AuthorDate: Mon Jun 5 17:24:03 2017 -0700 APEXCORE-742 Using a common utility method for creaton of yarn client instances and fixing scenarios where the client is not being initialized correctly. --- .../stram/StreamingAppMasterService.java | 539 ++--- .../java/com/datatorrent/stram/cli/ApexCli.java| 6 +- .../com/datatorrent/stram/client/StramAgent.java | 8 +- .../datatorrent/stram/client/StramClientUtils.java | 8 + .../datatorrent/stram/security/StramUserLogin.java | 14 +- .../apache/apex/engine/YarnAppLauncherImpl.java| 30 +- 6 files changed, 300 insertions(+), 305 deletions(-) diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 5030a32..63080bb 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -732,269 +732,265 @@ public class StreamingAppMasterService extends CompositeService ResourceRequestHandler resourceRequestor = System.getenv().containsKey("CDH_HADOOP_BIN") ? new BlacklistBasedResourceRequestHandler() : new ResourceRequestHandler(); List pendingContainerStartRequests = new LinkedList<>(); -YarnClient clientRMService = YarnClient.createYarnClient(); +try (YarnClient clientRMService = StramClientUtils.createYarnClient(conf)) { -try { - // YARN-435 - // we need getClusterNodes to populate the initial node list, - // subsequent updates come through the heartbeat response - clientRMService.init(conf); - clientRMService.start(); - - ApplicationReport ar = StramClientUtils.getStartedAppInstanceByName(clientRMService, dag.getAttributes().get(DAG.APPLICATION_NAME), UserGroupInformation.getLoginUser().getUserName(), dag.getAttributes().get(DAG.APPLICATION_ID)); - if (ar != null) { -appDone = true; -dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", -ar.getApplicationId().toString(), ar.getName(), ar.getUser()); -LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); -finishApplication(FinalApplicationStatus.FAILED); -return; + try { +// YARN-435 +// we need getClusterNodes to populate the initial node list, +// subsequent updates come through the heartbeat response + +ApplicationReport ar = StramClientUtils.getStartedAppInstanceByName(clientRMService, dag.getAttributes().get(DAG.APPLICATION_NAME), UserGroupInformation.getLoginUser().getUserName(), dag.getAttributes().get(DAG.APPLICATION_ID)); +if (ar != null) { + appDone = true; + dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", + ar.getApplicationId().toString(), ar.getName(), ar.getUser()); + LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); + finishApplication(FinalApplicationStatus.FAILED); + return; +} +resourceRequestor.updateNodeReports(clientRMService.getNodeReports()); +nodeReportUpdateTime = System.currentTimeMillis() + UPDATE_NODE_REPORTS_INTERVAL; + } catch (Exception e) { +throw new RuntimeException("Failed to retrieve cluster nodes report.", e); } - resourceRequestor.updateNodeReports(clientRMService.getNodeReports()); - nodeReportUpdateTime = System.currentTimeMillis() + UPDATE_NODE_REPORTS_INTERVAL; -} catch (Exception e) { - throw new RuntimeException("Failed to retrieve cluster nodes report.", e); -} finally { - clientRMService.stop(); -} -List containers = response.getContainersFromPreviousAttempts(); + List containers = response.getContainersFromPreviousAttempts(); -// Running containers might take a while to register with the new app master and send the heartbeat signal. -int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTB
[apex-core] branch master updated: APEXCORE-754 Add plugin dependency jar-files to application package
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 5994a0b APEXCORE-754 Add plugin dependency jar-files to application package 5994a0b is described below commit 5994a0b0fa9d2b60ca7595e7c6a1ebaab168060c Author: Sergey Golovko <ser...@datatorrent.com> AuthorDate: Thu Jul 6 11:25:08 2017 -0700 APEXCORE-754 Add plugin dependency jar-files to application package Included plugin jar-files into the application package and added names of the plugin jar-files to the application classpath. --- .../org/apache/apex/common/util/JarHelper.java | 76 +- .../java/com/datatorrent/stram/StramClient.java| 22 +-- .../stram/StreamingAppMasterService.java | 2 + 3 files changed, 92 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/apex/common/util/JarHelper.java b/common/src/main/java/org/apache/apex/common/util/JarHelper.java index d40cec8..bd75b44 100644 --- a/common/src/main/java/org/apache/apex/common/util/JarHelper.java +++ b/common/src/main/java/org/apache/apex/common/util/JarHelper.java @@ -27,11 +27,14 @@ import java.net.URL; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.security.CodeSource; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; @@ -41,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -52,6 +56,7 @@ import org.apache.hadoop.classification.InterfaceStability; public class JarHelper { private static final Logger logger = LoggerFactory.getLogger(JarHelper.class); + private static final String APEX_DEPENDENCIES = "apex-dependencies"; private final Map<URL, String> sourceToJar = new HashMap<>(); @@ -68,7 +73,7 @@ public class JarHelper return temp.getAbsolutePath(); } - public String getJar(Class jarClass) + public String getJar(Class jarClass, boolean makeJarFromFolder) { String jar = null; final CodeSource codeSource = jarClass.getProtectionDomain().getCodeSource(); @@ -88,6 +93,9 @@ public class JarHelper jar = location.getFile(); final File dir = new File(jar); if (dir.isDirectory()) { +if (!makeJarFromFolder) { + throw new AssertionError("Cannot resolve jar file for " + jarClass + ". URL " + location); +} try { jar = createJar("apex-", dir, false); } catch (IOException e) { @@ -107,6 +115,72 @@ public class JarHelper return jar; } + public String getJar(Class jarClass) + { +return getJar(jarClass, true); + } + + /** + * Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files + * that are defined in the property "apex-dependencies" of the manifest of the root jar-file. + * If the class is an independent file the method makes jar file from the folder that contains the class + * @param jarClass Class + * @param makeJarFromFolder True if the method should make jar from folder that contains the independent class + * @param addJarDependencies True if the method should include dependent jar files + * @return Set of names of the jar-files + */ + public Set getJars(Class jarClass, boolean makeJarFromFolder, boolean addJarDependencies) + { +String jar = getJar(jarClass, makeJarFromFolder); +Set set = new HashSet<>(); +if (jar != null) { + set.add(jar); + if (addJarDependencies) { +try { + getDependentJarsFromManifest(new JarFile(jar), set); +} catch (IOException ex) { + logger.warn("Cannot open Jar-file {}", jar); +} + } +} +return set; + } + + /** + * Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files + * that are defined in the property "apex-dependencies" of the manifest of the root jar-file. + * If the class is an independent file the method makes jar file from the folder that contains the class + * @param jarClass Class + * @return Set of names of the jar-files + */ + public Set getJars(Class jarClass) + { +return getJars(jarClass, true, true); + } + + /** + * Adds d
apex-core git commit: APEXCORE-705 Backpressure when spooling is disabled. The publisher is suspended if ahead of subscriber by maximum number of blocks.
Repository: apex-core Updated Branches: refs/heads/master f6e6672fa -> d55a3c592 APEXCORE-705 Backpressure when spooling is disabled. The publisher is suspended if ahead of subscriber by maximum number of blocks. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d55a3c59 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d55a3c59 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d55a3c59 Branch: refs/heads/master Commit: d55a3c592afa1221ab88dd043e414895a00c6413 Parents: f6e6672 Author: Pramod ImmaneniAuthored: Mon Apr 10 11:48:31 2017 -0700 Committer: Pramod Immaneni Committed: Mon Jun 26 18:54:42 2017 -0700 -- .../bufferserver/internal/DataList.java | 122 ++- .../datatorrent/bufferserver/server/Server.java | 2 +- 2 files changed, 93 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d55a3c59/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java -- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index d08b9fc..5813b56 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -390,12 +390,7 @@ public class DataList { boolean resumedSuspendedClients = false; if (numberOfInMemBlockPermits > 0) { - synchronized (suspendedClients) { -for (AbstractClient client : suspendedClients) { - resumedSuspendedClients |= client.resumeReadIfSuspended(); -} -suspendedClients.clear(); - } + resumedSuspendedClients = resumeSuspendedClients(); } else { logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, numberOfInMemBlockPermits, all_listeners); @@ -403,9 +398,46 @@ public class DataList return resumedSuspendedClients; } + private boolean resumeSuspendedClients() + { +boolean resumedSuspendedClients = false; +synchronized (suspendedClients) { + for (AbstractClient client : suspendedClients) { +resumedSuspendedClients |= client.resumeReadIfSuspended(); + } + suspendedClients.clear(); +} +return resumedSuspendedClients; + } + public boolean isMemoryBlockAvailable() { -return (storage == null) || (numberOfInMemBlockPermits.get() > 0); +return (numberOfInMemBlockPermits.get() > 0); + } + + public boolean areSubscribersBehindByMax() + { +boolean behind = false; +if (backPressureEnabled) { + // Seek to max blocks and see if any block is in use beyond that + int count = 0; + synchronized (this) { +Block curr = last.prev; +// go back the max number of blocks +while ((curr != null) && (++count < (MAX_COUNT_OF_INMEM_BLOCKS - 2))) { + curr = curr.prev; +} +// check if any block is in use +while (!behind && (curr != null)) { + // Since acquire happens before release, because of concurrency, in a corner case scenario we might still count a + // subscriber as being max behind when it is transitioning over to the next block but that is ok as it will only + // result in publisher blocking for some time and resuming + behind = (curr.refCount.get() != 0); + curr = curr.prev; +} + } +} +return behind; } public byte[] newBuffer(final int size) @@ -821,37 +853,47 @@ public class DataList final int refCount = this.refCount.decrementAndGet(); if (canEvict(refCount, writer)) { assert (next != null); -final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); -if (future != null && future.cancel(false)) { - logger.debug("Block {} future is cancelled", this); -} -final int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get(); -if (wait && numberOfInMemBlockPermits == 0) { - future = null; - storer.run(); -} else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS / 2) { - future = storageExecutor.submit(storer); +if (storage != null) { + evictBlock(wait); } else { - future = null; + if (!isPublisherAheadByMax()) { +resumeSuspendedClients(); + } } } } +private void evictBlock(boolean wait) +{ + final Runnable storer = getStorer(data, readingOffset,
apex-core git commit: APEXCORE-744 Add setting of predefined static logger appender properties
Repository: apex-core Updated Branches: refs/heads/master 5e23fb44c -> f9c1701a2 APEXCORE-744 Add setting of predefined static logger appender properties Added setting of the static logger appender properties: application name, container id, user name, service name, node name. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f9c1701a Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f9c1701a Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f9c1701a Branch: refs/heads/master Commit: f9c1701a2f22bf7405496c3ee4d6a71185762070 Parents: 5e23fb4 Author: Sergey GolovkoAuthored: Wed Jun 7 11:05:22 2017 -0700 Committer: Sergey Golovko Committed: Fri Jun 23 21:18:47 2017 -0700 -- .../datatorrent/stram/StreamingAppMaster.java | 2 ++ .../java/com/datatorrent/stram/cli/ApexCli.java | 2 +- .../stram/engine/StreamingContainer.java| 1 + .../com/datatorrent/stram/util/LoggerUtil.java | 37 .../datatorrent/stram/util/LoggerUtilTest.java | 32 + 5 files changed, 73 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/f9c1701a/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java index 7598b4f..de8ffa7 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMaster.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import com.datatorrent.stram.debug.StdOutErrLog; +import com.datatorrent.stram.util.LoggerUtil; import com.datatorrent.stram.util.VersionInfo; /** @@ -56,6 +57,7 @@ public class StreamingAppMaster extends StramUtils.YarnContainerMain */ public static void main(final String[] args) throws Throwable { +LoggerUtil.setupMDC("master"); StdOutErrLog.tieSystemOutAndErrToLog(); LOG.info("Master starting with classpath: {}", System.getProperty("java.class.path")); http://git-wip-us.apache.org/repos/asf/apex-core/blob/f9c1701a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index 787f20b..152e6f1 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -4129,7 +4129,7 @@ public class ApexCli public static void main(final String[] args) throws Exception { -LoggerUtil.addAppenders(); +LoggerUtil.setupMDC("client"); final ApexCli shell = new ApexCli(); shell.preImpersonationInit(args); String hadoopUserName = System.getenv("HADOOP_USER_NAME"); http://git-wip-us.apache.org/repos/asf/apex-core/blob/f9c1701a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index dd215a9..f5aaf35 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -286,6 +286,7 @@ public class StreamingContainer extends YarnContainerMain */ public static void main(String[] args) throws Throwable { +LoggerUtil.setupMDC("worker"); StdOutErrLog.tieSystemOutAndErrToLog(); logger.debug("PID: " + System.getenv().get("JVM_PID")); logger.info("Child starting with classpath: {}", System.getProperty("java.class.path")); http://git-wip-us.apache.org/repos/asf/apex-core/blob/f9c1701a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java index 14662e1..c862634 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java +++ b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java @@ -35,12 +35,17 @@ import javax.annotation.Nullable; import org.apache.apex.log.LogFileInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ApplicationId;
apex-core git commit: APEXCORE-748 Upgrade netlet dependency to 1.3.2
Repository: apex-core Updated Branches: refs/heads/master 34da9dce2 -> 3ddd2389f APEXCORE-748 Upgrade netlet dependency to 1.3.2 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3ddd2389 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3ddd2389 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3ddd2389 Branch: refs/heads/master Commit: 3ddd2389f7e52cbca82370b1f77377b083f4aa44 Parents: 34da9dc Author: Vlad RozovAuthored: Wed Jun 21 15:37:56 2017 -0700 Committer: Vlad Rozov Committed: Wed Jun 21 15:37:56 2017 -0700 -- api/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/3ddd2389/api/pom.xml -- diff --git a/api/pom.xml b/api/pom.xml index 168ceee..24e366b 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -98,7 +98,7 @@ com.datatorrent netlet - 1.3.1 + 1.3.2
apex-core git commit: APEXCORE-736 Using YARN client api to fetch the application master container report, this closes #534
Repository: apex-core Updated Branches: refs/heads/master a0dd30d8f -> 91eb45078 APEXCORE-736 Using YARN client api to fetch the application master container report, this closes #534 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/91eb4507 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/91eb4507 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/91eb4507 Branch: refs/heads/master Commit: 91eb45078b27d27dfcda35d124dce90f57f39efb Parents: a0dd30d Author: devtagareAuthored: Wed May 31 14:09:00 2017 -0700 Committer: Pramod Immaneni Committed: Thu Jun 8 18:55:47 2017 -0700 -- .../datatorrent/stram/StreamingContainerManager.java | 15 +-- 1 file changed, 9 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/91eb4507/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index c4e76a5..510a146 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -84,6 +84,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -168,7 +171,6 @@ import com.datatorrent.stram.util.ConfigUtils; import com.datatorrent.stram.util.FSJsonLineFile; import com.datatorrent.stram.util.MovingAverage.MovingAverageLong; import com.datatorrent.stram.util.SharedPubSubWebSocketClient; -import com.datatorrent.stram.util.WebServicesClient; import com.datatorrent.stram.webapp.ContainerInfo; import com.datatorrent.stram.webapp.LogicalOperatorInfo; import com.datatorrent.stram.webapp.OperatorAggregationInfo; @@ -485,11 +487,12 @@ public class StreamingContainerManager implements PlanContext String nodeHttpAddress = nmHost + ":" + nmHttpPort; if (allocatedMemoryMB == 0) { String url = ConfigUtils.getSchemePrefix(conf) + nodeHttpAddress + "/ws/v1/node/containers/" + ci.id; - WebServicesClient webServicesClient = new WebServicesClient(); - try { -String content = webServicesClient.process(url, String.class, new WebServicesClient.GetWebServicesHandler()); -JSONObject json = new JSONObject(content); -int totalMemoryNeededMB = json.getJSONObject("container").getInt("totalMemoryNeededMB"); + try (YarnClient rmClient = YarnClient.createYarnClient()) { +rmClient.init(conf); +rmClient.start(); +ContainerReport content = rmClient.getContainerReport(ContainerId.fromString(ci.id)); +int totalMemoryNeededMB = content.getAllocatedResource().getMemory(); +LOG.debug("App Master allocated memory is {}", totalMemoryNeededMB); if (totalMemoryNeededMB > 0) { allocatedMemoryMB = totalMemoryNeededMB; } else {
[apex-core] Git Push Summary
Repository: apex-core Updated Branches: refs/heads/APEXCORE-732 [deleted] a0dd30d8f
apex-malhar git commit: APEXMALHAR-2471 Upgrading APEXCORE dependency to version 3.6.0
Repository: apex-malhar Updated Branches: refs/heads/master 2493bcbf5 -> 8e44a9cab APEXMALHAR-2471 Upgrading APEXCORE dependency to version 3.6.0 Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8e44a9ca Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8e44a9ca Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8e44a9ca Branch: refs/heads/master Commit: 8e44a9cab014aa40e25d06ca324ef5fd70aec152 Parents: 2493bcb Author: ajaygit158Authored: Tue Apr 25 11:28:01 2017 +0530 Committer: ajaygit158 Committed: Thu May 25 11:31:45 2017 +0530 -- .../benchmark/WordCountOperator.java| 2 +- .../HBaseTransactionalPutOperatorTest.java | 131 ++- library/pom.xml | 31 + .../datatorrent/lib/math/RunningAverage.java| 6 +- .../com/datatorrent/lib/stream/Counter.java | 2 +- .../lib/io/fs/FileSplitterBaseTest.java | 5 +- .../malhar/lib/dedup/DeduperOrderingTest.java | 6 +- pom.xml | 4 +- .../stream/api/impl/ApexStreamImplTest.java | 8 +- 9 files changed, 61 insertions(+), 134 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java -- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java index 6e91482..8c55404 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java @@ -44,7 +44,7 @@ public class WordCountOperator implements Operator @Override public void process(T tuple) { - count++; + WordCountOperator.this.count++; } }; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java -- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java index 3cdc1bf..665cd40 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java @@ -19,19 +19,19 @@ package com.datatorrent.contrib.hbase; import java.io.IOException; -import java.util.Collection; -import org.apache.hadoop.hbase.client.Put; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.Attribute; +import org.apache.hadoop.hbase.client.Put; + import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Operator.ProcessingMode; +import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; /** * */ @@ -55,44 +55,9 @@ public class HBaseTransactionalPutOperatorTest { t1.setColFamily("colfam0");t1.setColName("street");t1.setRow("row1");t1.setColValue("ts"); HBaseTuple t2=new HBaseTuple(); t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc"); - thop.setup(new OperatorContext() { - -@Override -public T getValue(Attribute key) { - if(key.equals(PROCESSING_MODE)){ -return (T) ProcessingMode.AT_LEAST_ONCE; - } - return key.defaultValue; -} - -@Override -public AttributeMap getAttributes() { - return null; -} - -@Override -public int getId() { - // TODO Auto-generated method stub - return 0; -} - -@Override -public void setCounters(Object counters) { - // TODO Auto-generated method stub - -} - -@Override -public void sendMetrics(Collection collection) -{ -} - -@Override -public int getWindowsFromCheckpoint() -{ - return 0; -} - }); + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); + thop.setup(mockOperatorContext(0, attributeMap)); thop.beginWindow(0); thop.input.process(t1); thop.input.process(t2); @@ -125,44 +90,9 @@ public class
apex-malhar git commit: APEXMALHAR-2492 Correct usage of empty Slice in Malhar Library
Repository: apex-malhar Updated Branches: refs/heads/master 1c902dfb2 -> 2493bcbf5 APEXMALHAR-2492 Correct usage of empty Slice in Malhar Library Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2493bcbf Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2493bcbf Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2493bcbf Branch: refs/heads/master Commit: 2493bcbf508fcc9570c6e5798217e7feb55ccb7e Parents: 1c902df Author: ajaygit158Authored: Wed May 17 11:25:33 2017 +0530 Committer: ajaygit158 Committed: Thu May 25 01:22:11 2017 +0530 -- .../org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java| 3 +-- .../org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2493bcbf/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java -- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java index e2a1297..0552532 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java @@ -156,8 +156,7 @@ public class BoundedDedupOperator extends AbstractDeduper protected void putManagedState(Object tuple) { Slice key = getKey(tuple); -((ManagedTimeStateImpl)managedState).put(getBucketId(key), DEFAULT_CONSTANT_TIME, -key, new Slice(new byte[0])); +((ManagedTimeStateImpl)managedState).put(getBucketId(key), DEFAULT_CONSTANT_TIME, key, new Slice(null, 0, 0)); } protected int getBucketId(Slice key) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2493bcbf/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java -- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java index 3b5f5e2..9f83441 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java @@ -186,7 +186,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper implements A @Override protected void putManagedState(Object tuple) { -((ManagedTimeUnifiedStateImpl)managedState).put(getTime(tuple), getKey(tuple), new Slice(new byte[0])); +((ManagedTimeUnifiedStateImpl)managedState).put(getTime(tuple), getKey(tuple), new Slice(null, 0, 0)); }
apex-malhar git commit: APEXMALHAR-2491 Changes to shutdown the controller after completion of waittime in GenericFileOutputOperatorTest
Repository: apex-malhar Updated Branches: refs/heads/master 2f70751e7 -> 1c902dfb2 APEXMALHAR-2491 Changes to shutdown the controller after completion of waittime in GenericFileOutputOperatorTest Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1c902dfb Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1c902dfb Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1c902dfb Branch: refs/heads/master Commit: 1c902dfb22b5f815f9b5ec5441e646beec424ced Parents: 2f70751 Author: ajaygit158Authored: Tue May 16 17:06:00 2017 +0530 Committer: ajaygit158 Committed: Wed May 24 23:52:58 2017 +0530 -- .../apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java| 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1c902dfb/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java -- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java index f248266..6bbbd3a 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java @@ -166,6 +166,7 @@ public class GenericFileOutputOperatorTest extends AbstractFileOutputOperatorTes for (int i = 0; i < MAX && (!outputFile.exists()); ++i) { Thread.sleep(1000); } +lc.shutdown(); if (!outputFile.exists()) { String msg = String.format("Error: output file not found after %d seconds%n", MAX); throw new RuntimeException(msg);
apex-core git commit: APEXCORE-719 Pass an application name from stram client to application master and container via command line properties
Repository: apex-core Updated Branches: refs/heads/master 899f4cb0a -> dca51d99e APEXCORE-719 Pass an application name from stram client to application master and container via command line properties Added passing of the application name Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/dca51d99 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/dca51d99 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/dca51d99 Branch: refs/heads/master Commit: dca51d99e6246f99d3150f7b74d2f8368285ddd0 Parents: 899f4cb Author: Sergey GolovkoAuthored: Thu May 11 10:56:28 2017 -0700 Committer: Sergey Golovko Committed: Wed May 24 07:24:13 2017 -0700 -- .../java/com/datatorrent/stram/LaunchContainerRunnable.java | 2 +- engine/src/main/java/com/datatorrent/stram/StramClient.java | 1 + .../java/com/datatorrent/stram/client/StramClientUtils.java | 8 3 files changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/dca51d99/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java index dce648b..75acdf7 100644 --- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java +++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java @@ -253,6 +253,7 @@ public class LaunchContainerRunnable implements Runnable vargs.add(String.format("-D%scid=%s", StreamingApplication.DT_PREFIX, jvmID)); vargs.add("-Dhadoop.root.logger=" + (dag.isDebug() ? "DEBUG" : "INFO") + ",RFA"); vargs.add("-Dhadoop.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); +StramClientUtils.addAttributeToArgs(LogicalPlan.APPLICATION_NAME, dag, vargs); String loggersLevel = System.getProperty(StramUtils.DT_LOGGERS_LEVEL); if (loggersLevel != null) { @@ -333,5 +334,4 @@ public class LaunchContainerRunnable implements Runnable throw new RuntimeException("Error generating delegation token", e); } } - } http://git-wip-us.apache.org/repos/asf/apex-core/blob/dca51d99/engine/src/main/java/com/datatorrent/stram/StramClient.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 22a1c63..51d52c6 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -572,6 +572,7 @@ public class StramClient vargs.add("-Dhadoop.root.logger=" + (dag.isDebug() ? "DEBUG" : "INFO") + ",RFA"); vargs.add("-Dhadoop.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); vargs.add(String.format("-D%s=%s", StreamingContainer.PROP_APP_PATH, dag.assertAppPath())); + StramClientUtils.addAttributeToArgs(LogicalPlan.APPLICATION_NAME, dag, vargs); if (dag.isDebug()) { vargs.add("-Dlog4j.debug=true"); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/dca51d99/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index 15adab4..eead871 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -80,6 +80,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; import com.datatorrent.api.StreamingApplication; import com.datatorrent.stram.StramClient; import com.datatorrent.stram.StramUtils; @@ -870,4 +872,10 @@ public class StramClientUtils return appInfo; } + public static void addAttributeToArgs(Attribute attribute, Context context, List vargs) + { +if (context.getValue(attribute) != null) { + vargs.add(String.format("-D%s=$'%s'", attribute.getLongName(), context.getValue(attribute).replaceAll("['\"$]", "$0"))); +} + } }
apex-core git commit: APEXCORE-704 Add supporting of programmatic logger appender
Repository: apex-core Updated Branches: refs/heads/master 1c2d66adc -> e92741474 APEXCORE-704 Add supporting of programmatic logger appender Implemented supporting of a programmatic logger appender that can be added to Apex Application Master and Containers and be configurable programmatically. The new programmatic appender can be defined in Java code or via a value of the new Apex attribute "LOGGER_APPENDER". The syntax of the attribute value: {appender-names};{logger-properties} Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/e9274147 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/e9274147 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/e9274147 Branch: refs/heads/master Commit: e927414743c1c9a03cbecf97416385345c54e551 Parents: 1c2d66a Author: Sergey GolovkoAuthored: Fri Apr 14 12:18:55 2017 -0700 Committer: Sergey Golovko Committed: Mon May 15 17:04:52 2017 -0700 -- .../java/com/datatorrent/api/Attribute.java | 7 ++ .../main/java/com/datatorrent/api/Context.java | 4 + .../stram/LaunchContainerRunnable.java | 5 + .../java/com/datatorrent/stram/StramClient.java | 5 + .../java/com/datatorrent/stram/cli/ApexCli.java | 2 + .../datatorrent/stram/debug/StdOutErrLog.java | 3 + .../com/datatorrent/stram/util/LoggerUtil.java | 125 +-- .../datatorrent/stram/util/LoggerUtilTest.java | 68 ++ 8 files changed, 211 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/e9274147/api/src/main/java/com/datatorrent/api/Attribute.java -- diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java index a3b2f97..1d7b7b1 100644 --- a/api/src/main/java/com/datatorrent/api/Attribute.java +++ b/api/src/main/java/com/datatorrent/api/Attribute.java @@ -29,6 +29,8 @@ import java.util.Set; import com.google.common.base.Throwables; +import static com.datatorrent.api.StreamingApplication.APEX_PREFIX; + /** * Attribute represents the attribute which can be set on various components in the system. * @@ -88,6 +90,11 @@ public class Attribute implements Serializable return "attr" + name.substring(name.lastIndexOf('.')); } + public String getLongName() + { +return APEX_PREFIX + getSimpleName().replaceAll("_",".").toLowerCase(); + } + public String getSimpleName() { return name.substring(name.lastIndexOf('.') + 1); http://git-wip-us.apache.org/repos/asf/apex-core/blob/e9274147/api/src/main/java/com/datatorrent/api/Context.java -- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 743f0f1..ff1a2d4 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -393,6 +393,10 @@ public interface Context */ Attribute CONTAINER_JVM_OPTIONS = new Attribute<>(String2String.getInstance()); /** + * The options of dynamic apex logger appender + */ +Attribute LOGGER_APPENDER = new Attribute<>(String2String.getInstance()); +/** * The amount of memory to be requested for the application master. Not used in local mode. * Default value is 1GB. */ http://git-wip-us.apache.org/repos/asf/apex-core/blob/e9274147/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java index 76c1407..dce648b 100644 --- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java +++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java @@ -230,6 +230,11 @@ public class LaunchContainerRunnable implements Runnable } } +String loggerAppender = dag.getValue(LogicalPlan.LOGGER_APPENDER); +if (loggerAppender != null) { + vargs.add(String.format("-D%s=\"%s\"", LogicalPlan.LOGGER_APPENDER.getLongName(), loggerAppender)); +} + List operatorMetaList = Lists.newArrayList(); int bufferServerMemory = 0; for (PTOperator operator : sca.getContainer().getOperators()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/e9274147/engine/src/main/java/com/datatorrent/stram/StramClient.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java
apex-core git commit: APEXCORE-716 Javadoc for engine api packages warning that there are no backwards compatibility guarantees
Repository: apex-core Updated Branches: refs/heads/master 0de7be9b3 -> 0824a4baf APEXCORE-716 Javadoc for engine api packages warning that there are no backwards compatibility guarantees Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/0824a4ba Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/0824a4ba Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/0824a4ba Branch: refs/heads/master Commit: 0824a4baf688c73dc6159ff6d76c80688d881946 Parents: 0de7be9 Author: Pramod ImmaneniAuthored: Sun May 7 08:21:33 2017 -0700 Committer: Pramod Immaneni Committed: Wed May 10 10:49:56 2017 -0700 -- .../com/datatorrent/stram/api/package-info.java | 24 .../apache/apex/engine/api/package-info.java| 24 2 files changed, 48 insertions(+) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/0824a4ba/engine/src/main/java/com/datatorrent/stram/api/package-info.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/api/package-info.java b/engine/src/main/java/com/datatorrent/stram/api/package-info.java new file mode 100644 index 000..b9c43f2 --- /dev/null +++ b/engine/src/main/java/com/datatorrent/stram/api/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * This is internal api of the engine and is subject to change. No guarantees are made on the backwards compatibility + * of this api. + */ +package com.datatorrent.stram.api; http://git-wip-us.apache.org/repos/asf/apex-core/blob/0824a4ba/engine/src/main/java/org/apache/apex/engine/api/package-info.java -- diff --git a/engine/src/main/java/org/apache/apex/engine/api/package-info.java b/engine/src/main/java/org/apache/apex/engine/api/package-info.java new file mode 100644 index 000..34ef2c1 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/api/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * This is internal api of the engine and is subject to change. No guarantees are made on the backwards compatibility + * of this api. + */ +package org.apache.apex.engine.api;
apex-core git commit: APEXCORE-717 Remove unnecessary archetypeVersion property
Repository: apex-core Updated Branches: refs/heads/master 90528017f -> 0de7be9b3 APEXCORE-717 Remove unnecessary archetypeVersion property Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/0de7be9b Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/0de7be9b Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/0de7be9b Branch: refs/heads/master Commit: 0de7be9b36a7b8ee0edb823e8c8eb376d4fdb277 Parents: 9052801 Author: Vlad RozovAuthored: Sun May 7 14:02:14 2017 -0700 Committer: Vlad Rozov Committed: Wed May 10 07:12:23 2017 -0700 -- .../src/main/resources/META-INF/maven/archetype-metadata.xml| 3 --- .../src/main/resources/META-INF/maven/archetype-metadata.xml| 5 - 2 files changed, 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/0de7be9b/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml -- diff --git a/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml b/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml index c35b3af..46439df 100644 --- a/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml +++ b/apex-app-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -23,9 +23,6 @@ xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;> - - ${archetypeVersion} - ${project.version} http://git-wip-us.apache.org/repos/asf/apex-core/blob/0de7be9b/apex-conf-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml -- diff --git a/apex-conf-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml b/apex-conf-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml index d739c53..300b356 100644 --- a/apex-conf-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml +++ b/apex-conf-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -22,11 +22,6 @@ http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd; name="apex-app-archetype" xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;> - - - ${archetypeVersion} - - src/main/resources/META-INF
svn commit: r19287 - /release/apex/KEYS
Author: vrozov Date: Thu Apr 27 05:16:23 2017 New Revision: 19287 Log: Adding key for tus...@apache.org Modified: release/apex/KEYS Modified: release/apex/KEYS == --- release/apex/KEYS (original) +++ release/apex/KEYS Thu Apr 27 05:16:23 2017 @@ -378,3 +378,60 @@ skZiO9pfbVUm31Igu1tce0BFI4Luo5SyS6tm98ob H2M/dMurfJKxANzpUA== =lT9W -END PGP PUBLIC KEY BLOCK- + +pub rsa4096 2017-04-17 [SC] + AEB1 7087 336E 676B D03A 7E1B FC68 1885 6177 F743 +uid [ultimate] Tushar R. Gosavi (CODE SIGNING KEY) <tus...@apache.org> +sub rsa4096 2017-04-17 [E] + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQINBFj0jEIBEADjOtvt2Rpgx00XPVC3rCX+EtNh9GAHCznHdXEkPIq5QfFURz+0 +Je8dUt0SU8FcRiVTs3leUOu40JdBVCNB7SqXD2ratdSe5Lw1LxBriy1XVDywjuxK +e2jsfcMr1heRD8XbhjfL8iFeirNT9FIS38FNSMFHUu79PLH1RUpZtHqpdzUHebFn +T7kwConp2MtNJpF+/59XJySDtBAA77ecI9Xc2ZBcP1pDj3S0gws7HoWZSUZfABND +plI7DjKs3S7eFXePOy5NYylvFkMU0g8cJIaQ5nmuDyzCYu8wzIlIoOJv7GmJJieB +hhI0hDxZsfq96TusDGoGXoD33VLTMsC6AwG7//lmXqSop4qwJ0jFvC3p0dJI1hD8 +gG58N6IqsfPB/CoENyX8Ps08aELHrJEsjjFQTKQJWRYfOFij/0CAvPifAP8aq4EJ +9gr++f1oR/lodAHbG9o3S6CshPpqNJRBKrOR2/AyvyMt4mex+7R7D1DXnDUukm2R +ilqoTH8ZaU3b9MbO1Hl2Xp8VKc5J3TSSrGDY9czuxyosnEKhHYEkK8UYMdSG1Pwe ++3VRhhCiFEDzuq8E6atNBdeRe9JSH+Qh6FRKlOrtojck5FdGmO5DEXYdTqvUIV1s +6KqHg/JyGqqDYB6lwjJ9ShLEu3Bb2M6P5hqEY4q6BvLYH+MZoWlCti4bVwARAQAB +tDdUdXNoYXIgUi4gR29zYXZpIChDT0RFIFNJR05JTkcgS0VZKSA8dHVzaGFyQGFw +YWNoZS5vcmc+iQI3BBMBCgAhBQJY9IxCAhsDBQsJCAcDBRUKCQgLBRYCAwEAAh4B +AheAAAoJEPxoGIVhd/dDG4wQALjtLuLuYwaNPeg5MlhqxVcGJEmKNlJ4ANNiUXk2 ++Si7TvgT4cr9d0O7JHkbzx3Bka5J1Yi6jsQCSO3eG2G/KlTrrquqB1sSHaxBa73T ++HiJlPJwjlQT5VFE1NeKtntMh3Xv6Wiew0jN9FqG4vbnUBO4Z+I7U8Kv3hW52KuB +2Mg5zREH283mJ2i+uI/ygifLmG/pWZClL3OzfJviDo+2AOmRIeu6HMBs3MeFt/6h +ces/H2pUjFKSaYCgBfGzAT/eJXNYk4yAGi02npU+Fvr6BlBTvKGLBQCMCLu+RqCJ +eFwIOVK4VK2gwb4YI1GU7MkehRxcn0ry4oMQWhAsKKkGISw7Mn8R1ptmRQ0LOD4u +bb/5EueaWW2g7sqAGX+KTb64wIl6jdxWjeelGreu2v6cw5NKTMNb0SD9XNw3Gawm +C4ITUE3jc3Y7nDtDjv914k1BQO24ZrTEriNO/9JUf0j0Oy9D2Rl8j9AQmUYbnuJa +eJ5p+tR2EYfDS+hgSBNpTSEfBpfZ6yXcIX37VI7z1C44qpT2iGt0XmFWvaBzapj0 +TD1MGq/pnvQx8BaJ4BIu5NaBvJcXTeSe7CAS7tGRyxcLPE4vCgZM290n6OEf5jXL +dd8ajfOwbrJ6bImvnSrFD1IJFvu0k2kEqrjWRltprqGah+1hDVTFYTW6NnZKMYfZ +kXu+uQINBFj0jEIBEAC/pIZSNK3ihqokM5tmU6kK42gPZnhum9tlh0+KYIkUsHtQ +cWIaqSb3EEzXN3nreFty62M7FOOrvHjok7joTblE7auynpsv8ZuhAOC6ZF+nDceV +hN6eLtfNg7RfByDGCJOr8yGJlYXlxqKa7Q7Q/FjJqXfJlqmqBcKuLhjtj25vrpSJ +T9/iad6v4LOLt+SEPChPe5QAyI5UOmbTI5Df0ZZYxkiPCzelw3QmSvA7s/1TngfK +N2Sj3Y+oM2BZ+Vwp6UrJ5rI2e+lSyWXqWmoGUrwg9dpVNxWSGDZTe2Qw91zAl6BA +Lv2DS1BAniW4097EIGyLz0a1sG+D4bTtY1Xdo0JRtoFq1djuip3L4LrsVpW6k4vA +mVxBsRWZgd0CfSLLq2lkHQqGGUCjs8+ZX/w1iP7+BdVkLcybcLZpjrB46OnMp512 +0P4ofytRRgCZaJI4Mh8W70XVpe+Pz+hfYsMXwdgM9yTWPEttB+tCQupqaRwj6MAD +QPtdRZl8RJa09VM/xtRHyS51rIJYTkUkyD97JyCh7knQ5xbm9ghaCKGeyfipQ2Kc +8UI5dezhVIbJ0bQ4PaPe4pvXrBHUcXV8puLn6IBQ/ml6vTInIPkgbnIBewcJ6JTl +6H1TMcezZT9Fkg8XjhiKqifUxGBqeX/DNze/JEhBkG2k+1sk7XVikvqoxVMdCwAR +AQABiQIfBBgBCgAJBQJY9IxCAhsMAAoJEPxoGIVhd/dDzuYQAKn9LAkabHeX9vrz +i3ij2WStpQIrpWmJYe1Xz6GmHB8/YRinULY/XL30dHGQrprqi1fnuCkRyovwLqlv +ouV31ljrKGV37kMBIqlAfbdwDBoHtQeYDqdABVhjNngMP7KugjYA1OpzCrgsjWY7 +OG95dAVR4KlSCRtSuz4vN2lXylxtmrShWZittawVoYlCNs8eG3oJ+L8ltfVY9wOl +XIyVX2y/Jr1OyIPHTseA10PDgrWpro6Cda4NyNa57PP6/WHX2k1nS9b7Nc1NDeRH ++ymq1VSytfAxjnQRtJjLTkNIfRUEhPsB3kSKyx5NbsTeRL/1WKwQSZMO1a2gUU79 +ZMzZn1bTvFAxINBi+LiHl89AsHENDghFU20kBppcv+aAmvthIM6n5JfcNUjTWX/z ++Zj4mh5pUZnVKy8fbd83vBFmssrxOc52/YMfAlM5jcmQQnI3klF/qFETgtkVDsIF +NVTIQUnFQXO4GIVoivvLWFpbBwZn/sU//farY/tpQJ5d7WFwbxjkNzX5n0otEfrX +dp+EYxpEhneKfQYX3F64rlJcGi7HqJl8VChxSG9YWsLHMPWQzX5EzeIz8CfD0JkS +8la+kwB/Pa7rt0UVEX20IWiFTp7Rrq3uoS3FJ7+zpTQgY15hzfMtKgKvAeRl3tg6 +twETNTJFVsvuAcpNAr4tcGku1jqs +=lX2d +-END PGP PUBLIC KEY BLOCK-
apex-core git commit: APEXCORE-703 Window processing timeout for finished/undeployed container. During an operator shutdown mark it as INACTIVE to exclude it from the blocked operators check.
Repository: apex-core Updated Branches: refs/heads/master cfe9cefed -> 8829286d1 APEXCORE-703 Window processing timeout for finished/undeployed container. During an operator shutdown mark it as INACTIVE to exclude it from the blocked operators check. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8829286d Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8829286d Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8829286d Branch: refs/heads/master Commit: 8829286d12c755b9678498183b2ba052519a73c2 Parents: cfe9cef Author: Vlad RozovAuthored: Sun Apr 16 09:34:09 2017 -0700 Committer: Vlad Rozov Committed: Sat Apr 22 08:03:22 2017 -0700 -- .../stram/StreamingContainerManager.java| 1 + .../stram/plan/physical/PTOperator.java | 2 +- .../stram/plan/physical/PhysicalPlan.java | 2 +- .../stram/StreamingContainerManagerTest.java| 108 ++- 4 files changed, 109 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8829286d/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index d029b16..8d99dc1 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1378,6 +1378,7 @@ public class StreamingContainerManager implements PlanContext } deactivatedOpers.add(oper); } + oper.setState(State.INACTIVE); sca.undeployOpers.add(oper.getId()); slowestUpstreamOp.remove(oper); // record operator stop event http://git-wip-us.apache.org/repos/asf/apex-core/blob/8829286d/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java index 471dca2..84f6a5a 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java @@ -497,7 +497,7 @@ public class PTOperator implements java.io.Serializable @Override public String toString() { -return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).toString(); +return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).append("state", state).toString(); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/8829286d/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index ecc010c..ab2a3ae 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -1440,7 +1440,7 @@ public class PhysicalPlan implements Serializable void removePTOperator(PTOperator oper) { -LOG.debug("Removing operator " + oper); +LOG.debug("Removing operator {}", oper); // per partition merge operators if (!oper.upstreamMerge.isEmpty()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/8829286d/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java -- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 53f18f9..3f2c20b 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -369,6 +369,103 @@ public class StreamingContainerManagerTest Assert.assertEquals("sourcePortName " + node3DI, mergeNodeDI.outputs.get(0).portName, node3In.sourcePortName); } + private static void shutdownOperator(StreamingContainerManager scm, PTOperator p1, PTOperator p2) + { +assignContainer(scm, "c1"); +assignContainer(scm, "c2"); + +ContainerHeartbeat c1hb = new ContainerHeartbeat(); +
apex-core git commit: APEXCORE-702 Mark plugin interfaces as Evolving, and add them to plugin subpackage.
Repository: apex-core Updated Branches: refs/heads/master e4022674e -> 25e4c4c51 APEXCORE-702 Mark plugin interfaces as Evolving, and add them to plugin subpackage. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/25e4c4c5 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/25e4c4c5 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/25e4c4c5 Branch: refs/heads/master Commit: 25e4c4c51b599e63d746ca207d7557c616162381 Parents: e402267 Author: Tushar R. GosaviAuthored: Fri Apr 14 22:50:52 2017 +0530 Committer: Tushar R. Gosavi Committed: Fri Apr 21 09:12:05 2017 +0530 -- .../java/org/apache/apex/api/ApexPlugin.java| 29 .../org/apache/apex/api/ApexPluginContext.java | 29 .../org/apache/apex/api/DAGSetupPlugin.java | 135 --- .../apache/apex/api/plugin/DAGSetupPlugin.java | 134 ++ .../java/org/apache/apex/api/plugin/Plugin.java | 32 + .../apache/apex/api/plugin/PluginContext.java | 32 + .../common/util/BaseDAGSetupPlugin.java | 76 --- .../apex/common/util/BaseDAGSetupPlugin.java| 78 +++ .../stram/StreamingAppMasterService.java| 4 +- .../stram/StreamingContainerManager.java| 6 +- .../plan/logical/DAGSetupPluginManager.java | 2 +- .../plan/logical/LogicalPlanConfiguration.java | 2 +- .../apex/engine/api/DAGExecutionPlugin.java | 44 -- .../engine/api/DAGExecutionPluginContext.java | 89 .../apache/apex/engine/api/PluginLocator.java | 39 -- .../engine/api/plugin/DAGExecutionPlugin.java | 43 ++ .../api/plugin/DAGExecutionPluginContext.java | 92 + .../apex/engine/api/plugin/PluginLocator.java | 39 ++ .../AbstractDAGExecutionPluginContext.java | 9 +- .../engine/plugin/ApexPluginDispatcher.java | 2 +- .../apex/engine/plugin/ApexPluginManager.java | 8 +- .../plugin/DefaultApexPluginDispatcher.java | 6 +- .../engine/plugin/NoOpApexPluginDispatcher.java | 2 +- .../plugin/loaders/ChainedPluginLocator.java| 2 +- .../loaders/PropertyBasedPluginLocator.java | 2 +- .../ServiceLoaderBasedPluginLocator.java| 2 +- .../plugin/loaders/StaticPluginLocator.java | 2 +- .../plan/logical/PropertyInjectorVisitor.java | 2 +- .../apache/apex/engine/plugin/DebugPlugin.java | 12 +- .../apache/apex/engine/plugin/NoOpPlugin.java | 4 +- .../apache/apex/engine/plugin/PluginTests.java | 8 +- ...rg.apache.apex.engine.api.DAGExecutionPlugin | 19 --- ...he.apex.engine.api.plugin.DAGExecutionPlugin | 19 +++ 33 files changed, 510 insertions(+), 494 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/25e4c4c5/api/src/main/java/org/apache/apex/api/ApexPlugin.java -- diff --git a/api/src/main/java/org/apache/apex/api/ApexPlugin.java b/api/src/main/java/org/apache/apex/api/ApexPlugin.java deleted file mode 100644 index b9a8b78..000 --- a/api/src/main/java/org/apache/apex/api/ApexPlugin.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.api; - -import com.datatorrent.api.Component; - -/** - * Marker interface for ApexPlugins. - * @param - */ -public interface ApexPlugin extends Component -{ -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/25e4c4c5/api/src/main/java/org/apache/apex/api/ApexPluginContext.java -- diff --git a/api/src/main/java/org/apache/apex/api/ApexPluginContext.java b/api/src/main/java/org/apache/apex/api/ApexPluginContext.java deleted file mode 100644 index 1b72f63..000 --- a/api/src/main/java/org/apache/apex/api/ApexPluginContext.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the
[2/2] apex-core git commit: Merge branch 'APEXCORE-511.emptyNameChecksInLogicalPlan' of http://github.com/oliverwnk/apex-core into APEXCORE-511
Merge branch 'APEXCORE-511.emptyNameChecksInLogicalPlan' of http://github.com/oliverwnk/apex-core into APEXCORE-511 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f63e01d1 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f63e01d1 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f63e01d1 Branch: refs/heads/master Commit: f63e01d142f16dad34e2420dccbcc8addbbd404a Parents: ca1a375 2ce4ae5 Author: Vlad RozovAuthored: Thu Apr 13 09:04:48 2017 -0700 Committer: Vlad Rozov Committed: Thu Apr 13 09:04:48 2017 -0700 -- api/src/main/java/com/datatorrent/api/DAG.java | 18 +++-- .../stram/plan/logical/LogicalPlan.java | 29 --- .../stram/plan/logical/LogicalPlanTest.java | 82 3 files changed, 79 insertions(+), 50 deletions(-) --
[1/2] apex-core git commit: APEXCORE-511 add null and empty checks for addOperator, addStream and addModule
Repository: apex-core Updated Branches: refs/heads/master ca1a375f9 -> f63e01d14 APEXCORE-511 add null and empty checks for addOperator, addStream and addModule Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2ce4ae51 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2ce4ae51 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2ce4ae51 Branch: refs/heads/master Commit: 2ce4ae515ddfadddace260839b634cf122653a29 Parents: 077009e Author: Oliver WinkeAuthored: Tue Apr 11 17:27:35 2017 -0700 Committer: Oliver Winke Committed: Wed Apr 12 10:25:17 2017 -0700 -- api/src/main/java/com/datatorrent/api/DAG.java | 18 +++-- .../stram/plan/logical/LogicalPlan.java | 29 --- .../stram/plan/logical/LogicalPlanTest.java | 82 3 files changed, 79 insertions(+), 50 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/api/src/main/java/com/datatorrent/api/DAG.java -- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index 96420a3..93936d7 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -22,6 +22,8 @@ import java.io.Serializable; import java.util.Collection; import java.util.Map; +import javax.annotation.Nonnull; + import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Context.DAGContext; @@ -216,7 +218,7 @@ public interface DAG extends DAGContext, Serializable * @param clazz Concrete class with default constructor so that instance of it can be initialized and added to the DAG. * @return Instance of the operator that has been added to the DAG. */ - T addOperator(String name, Class clazz); + T addOperator(@Nonnull String name, Class clazz); /** * addOperator. @@ -225,20 +227,20 @@ public interface DAG extends DAGContext, Serializable * @param operator Instance of the operator that needs to be added to the DAG * @return Instance of the operator that has been added to the DAG. */ - T addOperator(String name, T operator); + T addOperator(@Nonnull String name, T operator); @InterfaceStability.Evolving - T addModule(String name, Class moduleClass); + T addModule(@Nonnull String name, Class moduleClass); @InterfaceStability.Evolving - T addModule(String name, T module); + T addModule(@Nonnull String name, T module); /** * addStream. * @param id Identifier of the stream that will be used to identify stream in DAG * @return */ - StreamMeta addStream(String id); + StreamMeta addStream(@Nonnull String id); /** * Add identified stream for given source and sinks. Multiple sinks can be @@ -256,7 +258,7 @@ public interface DAG extends DAGContext, Serializable * @param sinks * @return StreamMeta */ - StreamMeta addStream(String id, Operator.OutputPort source, Operator.InputPort... sinks); + StreamMeta addStream(@Nonnull String id, Operator.OutputPort source, Operator.InputPort... sinks); /** * Overload varargs version to avoid generic array type safety warnings in calling code. @@ -269,12 +271,12 @@ public interface DAG extends DAGContext, Serializable * @param sink1 * @return StreamMeta */ - StreamMeta addStream(String id, Operator.OutputPort source, Operator.InputPort sink1); + StreamMeta addStream(@Nonnull String id, Operator.OutputPort source, Operator.InputPort sink1); /** * addStream. */ - StreamMeta addStream(String id, Operator.OutputPort source, Operator.InputPort sink1, Operator.InputPort sink2); + StreamMeta addStream(@Nonnull String id, Operator.OutputPort source, Operator.InputPort sink1, Operator.InputPort sink2); /** * setAttribute. http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 62c4fd8..bf4b2cb 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -51,6 +51,7 @@ import java.util.Stack; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import javax.annotation.Nonnull; import javax.validation.ConstraintViolation; import javax.validation.ConstraintViolationException; import javax.validation.Validation; @@
apex-core git commit: APEXCORE-695 Remove unnecessary interface modifiers
Repository: apex-core Updated Branches: refs/heads/master 88bf33627 -> 077009e4c APEXCORE-695 Remove unnecessary interface modifiers Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/077009e4 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/077009e4 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/077009e4 Branch: refs/heads/master Commit: 077009e4cc80d1988bfcec50ceb8b4e27ec577bd Parents: 88bf336 Author: Apex DevAuthored: Sun Apr 9 17:22:11 2017 -0700 Committer: Vlad Rozov Committed: Tue Apr 11 08:54:51 2017 -0700 -- .../java/com/datatorrent/api/Attribute.java | 6 +- .../java/com/datatorrent/api/AutoMetric.java| 6 +- .../java/com/datatorrent/api/Component.java | 10 +-- .../main/java/com/datatorrent/api/Context.java | 10 +-- .../api/ControlTupleEnabledSink.java| 4 +- api/src/main/java/com/datatorrent/api/DAG.java | 78 ++-- .../java/com/datatorrent/api/InputOperator.java | 2 +- .../java/com/datatorrent/api/LocalMode.java | 10 +-- .../main/java/com/datatorrent/api/Operator.java | 18 ++--- .../java/com/datatorrent/api/Partitioner.java | 18 ++--- api/src/main/java/com/datatorrent/api/Sink.java | 8 +- .../main/java/com/datatorrent/api/Stats.java| 4 +- .../java/com/datatorrent/api/StatsListener.java | 20 ++--- .../java/com/datatorrent/api/StorageAgent.java | 12 +-- .../api/annotation/ApplicationAnnotation.java | 2 +- .../annotation/InputPortFieldAnnotation.java| 4 +- .../api/annotation/OperatorAnnotation.java | 4 +- .../annotation/OutputPortFieldAnnotation.java | 6 +- .../datatorrent/api/annotation/RecordField.java | 4 +- .../org/apache/apex/api/DAGSetupPlugin.java | 2 +- .../bufferserver/storage/Storage.java | 8 +- .../bufferserver/support/Subscriber.java| 6 +- .../common/experimental/AppData.java| 18 ++--- .../common/util/NumberAggregate.java| 14 ++-- .../common/util/ScheduledExecutorService.java | 2 +- .../apex/common/util/AsyncStorageAgent.java | 4 +- .../com/datatorrent/stram/EventRecorder.java| 2 +- .../datatorrent/stram/LicensingProtocol.java| 4 +- .../com/datatorrent/stram/StatsRecorder.java| 4 +- .../datatorrent/stram/api/ContainerContext.java | 8 +- .../stram/api/NodeActivationListener.java | 4 +- .../StreamingContainerUmbilicalProtocol.java| 2 +- .../stram/client/StramClientUtils.java | 2 +- .../stram/codec/StatefulStreamCodec.java| 6 +- .../stram/engine/ByteCounterStream.java | 2 +- .../com/datatorrent/stram/engine/Stream.java| 4 +- .../stram/util/SharedPubSubWebSocketClient.java | 4 +- .../engine/api/DAGExecutionPluginContext.java | 18 ++--- .../stram/webapp/TypeDiscoveryTest.java | 2 +- 39 files changed, 171 insertions(+), 171 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/077009e4/api/src/main/java/com/datatorrent/api/Attribute.java -- diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java index 821ecb2..a3b2f97 100644 --- a/api/src/main/java/com/datatorrent/api/Attribute.java +++ b/api/src/main/java/com/datatorrent/api/Attribute.java @@ -107,7 +107,7 @@ public class Attribute implements Serializable * * @since 0.3.2 */ - public static interface AttributeMap extends Cloneable + public interface AttributeMap extends Cloneable { /** * Return the attribute value for the given key. If the map does not have an @@ -150,7 +150,7 @@ public class Attribute implements Serializable /** * DefaultAttributeMap is the default implementation of AttributeMap. It's backed by a map internally. */ -public static class DefaultAttributeMap implements AttributeMap, Serializable +class DefaultAttributeMap implements AttributeMap, Serializable { private HashMap map; @@ -234,7 +234,7 @@ public class Attribute implements Serializable * * Engine uses it internally to initialize the Interfaces that may have Attributes defined in them. */ -public static class AttributeInitializer +class AttributeInitializer { static final HashMap > map = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/077009e4/api/src/main/java/com/datatorrent/api/AutoMetric.java -- diff --git a/api/src/main/java/com/datatorrent/api/AutoMetric.java b/api/src/main/java/com/datatorrent/api/AutoMetric.java index b487e04..8369b87 100644
[2/2] apex-core git commit: APEXCORE-658 Enable apex. prefix for configuration keys.
APEXCORE-658 Enable apex. prefix for configuration keys. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/412a3bd8 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/412a3bd8 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/412a3bd8 Branch: refs/heads/master Commit: 412a3bd81fbfc973858f36172533c0d5ab83e39f Parents: aa81bea Author: Thomas WeiseAuthored: Mon Mar 27 20:54:51 2017 -0700 Committer: Thomas Weise Committed: Sun Apr 9 13:42:38 2017 -0700 -- .../src/main/resources/META-INF/properties.xml | 8 +- .../src/site/conf/my-app-conf1.xml | 4 +- .../src/main/resources/META-INF/properties.xml | 10 +- .../main/java/com/datatorrent/api/Context.java | 2 +- .../datatorrent/api/StreamingApplication.java | 10 + docs/application_development.md | 14 +- docs/application_packages.md| 56 ++--- .../java/com/datatorrent/stram/cli/ApexCli.java | 16 +- .../plan/logical/LogicalPlanConfiguration.java | 40 ++-- .../datatorrent/stram/StramMiniClusterTest.java | 40 ++-- .../logical/LogicalPlanConfigurationTest.java | 238 +++ .../logical/module/TestModuleExpansion.java | 2 +- .../logical/module/TestModuleProperties.java| 10 +- .../test/resources/clusterTest.app.properties | 44 engine/src/test/resources/dt-site.xml | 28 +-- .../resources/testModuleTopology.properties | 62 ++--- .../src/test/resources/testTopology.properties | 36 +-- .../testTopologyLegacyPrefix.properties | 27 +++ 18 files changed, 341 insertions(+), 306 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml -- diff --git a/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml b/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml index 876c39a..34679b6 100644 --- a/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml +++ b/apex-app-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml @@ -2,22 +2,22 @@ - dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples + apex.application.MyFirstApplication.operator.randomGenerator.prop.numTuples 1000 - dt.application.MyFirstApplication.operator.console.prop.stringFormat + apex.application.MyFirstApplication.operator.console.prop.stringFormat hello world: %s http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml -- diff --git a/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml b/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml index ccb2b66..7ceba7c 100644 --- a/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml +++ b/apex-app-archetype/src/main/resources/archetype-resources/src/site/conf/my-app-conf1.xml @@ -1,11 +1,11 @@ -dt.attr.MASTER_MEMORY_MB +apex.attr.MASTER_MEMORY_MB 1024 - dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples + apex.application.MyFirstApplication.operator.randomGenerator.prop.numTuples 1000 http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml -- diff --git a/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml b/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml index 9044325..0ee7dc2 100644 --- a/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml +++ b/apex-conf-archetype/src/main/resources/archetype-resources/src/main/resources/META-INF/properties.xml @@ -2,24 +2,24 @@ -dt.attr.MASTER_MEMORY_MB +apex.attr.MASTER_MEMORY_MB 1024 - dt.application.MyFirstApplication.operator.seedGen.prop.seedStart + apex.application.MyFirstApplication.operator.seedGen.prop.seedStart 1 - dt.application.MyFirstApplication.operator.seedGen.prop.seedEnd +
[1/2] apex-core git commit: APEXCORE-658 Enable apex. prefix for configuration keys.
Repository: apex-core Updated Branches: refs/heads/master aa81bea30 -> 412a3bd81 http://git-wip-us.apache.org/repos/asf/apex-core/blob/412a3bd8/engine/src/test/resources/testTopologyLegacyPrefix.properties -- diff --git a/engine/src/test/resources/testTopologyLegacyPrefix.properties b/engine/src/test/resources/testTopologyLegacyPrefix.properties new file mode 100644 index 000..012c9cb --- /dev/null +++ b/engine/src/test/resources/testTopologyLegacyPrefix.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +dt.operator.o1.classname=com.datatorrent.stram.engine.TestGeneratorInputOperator + +dt.operator.o2.classname=com.datatorrent.stram.engine.GenericTestOperator +dt.operator.o2.myStringProperty=myStringPropertyValue + +dt.stream.s1.source=o1.outport +dt.stream.s1.sinks=o2.inport1 +dt.stream.s1.locality=CONTAINER_LOCAL
apex-core git commit: APEXCORE-677 Avoid starting StramLocalCluster in StreamingContainerManagerTest.testAppDataSources
Repository: apex-core Updated Branches: refs/heads/master 869e166e1 -> 938361374 APEXCORE-677 Avoid starting StramLocalCluster in StreamingContainerManagerTest.testAppDataSources Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/93836137 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/93836137 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/93836137 Branch: refs/heads/master Commit: 9383613741d6614a613bcbf9a41f4648cf22c147 Parents: 869e166 Author: Vlad RozovAuthored: Tue Mar 21 19:17:12 2017 -0700 Committer: Vlad Rozov Committed: Fri Apr 7 13:42:47 2017 -0700 -- .../java/com/datatorrent/stram/StreamingContainerManagerTest.java | 2 -- 1 file changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/93836137/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java -- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 84622c4..c606f47 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -875,7 +875,6 @@ public class StreamingContainerManagerTest private void testAppDataSources(boolean appendQIDToTopic) throws Exception { StramLocalCluster lc = new StramLocalCluster(dag); -lc.runAsync(); StreamingContainerManager dnmgr = lc.dnmgr; List appDataSources = dnmgr.getAppDataSources(); Assert.assertEquals("There should be exactly one data source", 1, appDataSources.size()); @@ -890,7 +889,6 @@ public class StreamingContainerManagerTest Assert.assertEquals("Result topic verification", "xyz.result", result.topic); Assert.assertEquals("Result URL verification", "ws://123.123.123.124:9090/pubsub", result.url); Assert.assertEquals("Result QID append verification", appendQIDToTopic, result.appendQIDToTopic); -lc.shutdown(); } @Test
apex-malhar git commit: APEXMALHAR-2469 Upgrade org.apache.httpcomponents.httpclient (this closes #553)
Repository: apex-malhar Updated Branches: refs/heads/master 8e926949e -> 513e9e2d4 APEXMALHAR-2469 Upgrade org.apache.httpcomponents.httpclient (this closes #553) Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/513e9e2d Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/513e9e2d Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/513e9e2d Branch: refs/heads/master Commit: 513e9e2d471a792c72f8e2f115fa86f450c267e2 Parents: 8e92694 Author: Vlad RozovAuthored: Sat Apr 1 08:38:13 2017 -0700 Committer: Vlad Rozov Committed: Sat Apr 1 08:43:32 2017 -0700 -- contrib/pom.xml| 3 +-- examples/mrmonitor/pom.xml | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/513e9e2d/contrib/pom.xml -- diff --git a/contrib/pom.xml b/contrib/pom.xml index 6afc90a..893ec2d 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -604,10 +604,9 @@ true - org.apache.httpcomponents httpclient - 4.3.5 + 4.3.6 true http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/513e9e2d/examples/mrmonitor/pom.xml -- diff --git a/examples/mrmonitor/pom.xml b/examples/mrmonitor/pom.xml index d6982c0..7dc3080 100644 --- a/examples/mrmonitor/pom.xml +++ b/examples/mrmonitor/pom.xml @@ -56,7 +56,7 @@ org.apache.httpcomponents httpclient - 4.3.5 + 4.3.6 jar
apex-core git commit: APEXCORE-663 Restart of the App was failing because the containers allocation was not handled. There are 2 schenarios 2 handle 1. AppMaster was restarted 2. App was restarted
Repository: apex-core Updated Branches: refs/heads/master 04a352b3e -> 9d6408ea4 APEXCORE-663 Restart of the App was failing because the containers allocation was not handled. There are 2 schenarios 2 handle 1. AppMaster was restarted 2. App was restarted In case of 1: Compare the list of running containers returned by the YARN and Streaming Container Manager containers and take the appropriate actions. In case of 2 (Also when YARN returns empty list of running containers) : Redeploy the whole App. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9d6408ea Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9d6408ea Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9d6408ea Branch: refs/heads/master Commit: 9d6408ea4c6e26df489a5e5dd1ecfdb9afca6d42 Parents: 04a352b Author: Sandesh HegdeAuthored: Tue Mar 7 23:43:24 2017 -0800 Committer: Sandesh Hegde Committed: Mon Mar 27 19:16:00 2017 -0700 -- .../stram/StreamingAppMasterService.java| 53 +--- .../stram/StreamingContainerManager.java| 24 + 2 files changed, 70 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/9d6408ea/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index a885a49..b15c98f 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -27,6 +27,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -720,7 +721,6 @@ public class StreamingAppMasterService extends CompositeService int loopCounter = -1; long nodeReportUpdateTime = 0; -List releasedContainers = new ArrayList<>(); // keep track of already requested containers to not request them again while waiting for allocation int numRequestedContainers = 0; @@ -761,7 +761,7 @@ public class StreamingAppMasterService extends CompositeService // Running containers might take a while to register with the new app master and send the heartbeat signal. int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0; -previouslyAllocatedContainers(containers); +List releasedContainers = previouslyAllocatedContainers(containers); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, @@ -1089,13 +1089,52 @@ public class StreamingAppMasterService extends CompositeService * Check for containers that were allocated in a previous attempt. * If the containers are still alive, wait for them to check in via heartbeat. */ - private void previouslyAllocatedContainers(List containers) + private List previouslyAllocatedContainers(List containersListByYarn) { -for (Container container : containers) { - this.allocatedContainers.put(container.getId().toString(), new AllocatedContainer(container)); - //check the status - nmClient.getContainerStatusAsync(container.getId(), container.getNodeId()); +List containersToRelease = new ArrayList<>(); + +if (containersListByYarn.size() != 0) { + + LOG.debug("Containers list by YARN - {}", containersListByYarn); + LOG.debug("Containers list by Streaming Container Manger - {}", dnmgr.getPhysicalPlan().getContainers()); + + Map fromYarn = new HashMap<>(); + + for (Container container : containersListByYarn) { +fromYarn.put(container.getId().toString(), container); + } + + for (PTContainer ptContainer : dnmgr.getPhysicalPlan().getContainers()) { + +String containerId = ptContainer.getExternalId(); + +// SCM starts the container without external ID. +if (containerId == null) { + continue; +} + +Container container = fromYarn.get(containerId); + +if (container != null) { + allocatedContainers.put(containerId, new AllocatedContainer(container)); + fromYarn.remove(containerId); +} else { + dnmgr.scheduleContainerRestart(containerId); +} + } + + for (Container container : fromYarn.values()) { +
[2/2] apex-core git commit: Merge branch 'APEXCORE-680_LogLevel' of http://github.com/Hitesh-Scorpio/apex-core into APEXCORE-680
Merge branch 'APEXCORE-680_LogLevel' of http://github.com/Hitesh-Scorpio/apex-core into APEXCORE-680 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/04a352b3 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/04a352b3 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/04a352b3 Branch: refs/heads/master Commit: 04a352b3ed6a602b52fdfa618c0823b9f1fc6389 Parents: c42f26e 9d2707a Author: Vlad RozovAuthored: Mon Mar 27 12:36:29 2017 -0700 Committer: Vlad Rozov Committed: Mon Mar 27 12:36:29 2017 -0700 -- .../com/datatorrent/stram/StreamingContainerManager.java | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/04a352b3/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java --
[1/2] apex-core git commit: APEXCORE-680 setting appropriate log level for heartbeat timeout
Repository: apex-core Updated Branches: refs/heads/master c42f26e01 -> 04a352b3e APEXCORE-680 setting appropriate log level for heartbeat timeout Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9d2707a2 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9d2707a2 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9d2707a2 Branch: refs/heads/master Commit: 9d2707a263a6a4c7247f08a423a9e2ba6b33a8da Parents: 16d1bf6 Author: Hitesh-ScorpioAuthored: Fri Mar 24 11:06:00 2017 +0530 Committer: Hitesh-Scorpio Committed: Mon Mar 27 01:37:02 2017 +0530 -- .../com/datatorrent/stram/StreamingContainerManager.java | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/9d2707a2/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index ee07af1..a989342 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -780,7 +780,7 @@ public class StreamingContainerManager implements PlanContext //LOG.debug("{} {} {}", c.getExternalId(), currentTms - sca.createdMillis, this.vars.heartbeatTimeoutMillis); // container allocated but process was either not launched or is not able to phone home if (currentTms - sca.createdMillis > 2 * this.vars.heartbeatTimeoutMillis) { -LOG.error("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis); +LOG.warn("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis); containerStopRequests.put(c.getExternalId(), c.getExternalId()); } } else { @@ -788,10 +788,10 @@ public class StreamingContainerManager implements PlanContext if (!isApplicationIdle()) { // Check if the heartbeat for this agent has already been missed to raise the StramEvent only once if (sca.lastHeartbeatMillis != -1) { -String info = String.format("Container %s@%s heartbeat timeout (%d%n ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis); -LOG.error(info); -StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), info, null); -stramEvent.setReason(info); +String msg = String.format("Container %s@%s heartbeat timeout (%d%n ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis); +LOG.warn(msg); +StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), msg, null); +stramEvent.setReason(msg); recordEventAsync(stramEvent); sca.lastHeartbeatMillis = -1; }
apex-core git commit: APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()
Repository: apex-core Updated Branches: refs/heads/release-3.5 bd8f7bade -> fdebb1954 APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec() Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/fdebb195 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/fdebb195 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/fdebb195 Branch: refs/heads/release-3.5 Commit: fdebb1954fb87cadbef18c6d1e8c4e2b6b9752e0 Parents: bd8f7ba Author: Vlad RozovAuthored: Mon Jan 30 17:24:45 2017 -0800 Committer: Vlad Rozov Committed: Sat Mar 25 17:09:01 2017 -0700 -- .../stram/StreamingContainerAgent.java | 53 +--- .../stram/StreamingContainerManager.java| 4 +- .../stram/plan/logical/LogicalPlan.java | 39 +- .../stram/plan/physical/PhysicalPlan.java | 16 +++--- .../stram/plan/physical/StreamMapping.java | 13 ++--- .../com/datatorrent/stram/StreamCodecTest.java | 8 +-- .../stram/plan/StreamPersistanceTests.java | 2 +- 7 files changed, 69 insertions(+), 66 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/fdebb195/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 2ea37f4..1effd15 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -201,7 +201,11 @@ public class StreamingContainerAgent if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { // input attributes of the downstream operator for (InputPortMeta sink : streamMeta.getSinks()) { -portInfo.contextAttributes = sink.getAttributes(); +try { + portInfo.contextAttributes = sink.getAttributes().clone(); +} catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); +} break; } } @@ -214,11 +218,11 @@ public class StreamingContainerAgent for (PTOperator.PTInput input : out.sinks) { // Create mappings for all non-inline operators if (input.target.getContainer() != out.source.getContainer()) { - InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input); - StreamCodec streamCodecInfo = getStreamCodec(inputPortMeta); - Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); + final StreamCodec streamCodec = getIdentifyingInputPortMeta(input).getStreamCodec(); + final Integer id = physicalPlan.getStreamCodecIdentifier(streamCodec); + // TODO: replace with inputInfo.streamCodecs.putIfAbsent() after support for JDK 1.7 is dropped. if (!portInfo.streamCodecs.containsKey(id)) { -portInfo.streamCodecs.put(id, streamCodecInfo); +portInfo.streamCodecs.put(id, streamCodec); } } } @@ -247,11 +251,19 @@ public class StreamingContainerAgent InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta); if (inputPortMeta != null) { - inputInfo.contextAttributes = inputPortMeta.getAttributes(); + try { +inputInfo.contextAttributes = inputPortMeta.getAttributes().clone(); + } catch (CloneNotSupportedException e) { +throw new RuntimeException("Cannot clone attributes", e); + } } if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { - inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes(); + try { +inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone(); + } catch (CloneNotSupportedException e) { +throw new RuntimeException("Cannot clone attributes", e); + } } inputInfo.sourceNodeId = sourceOutput.source.getId(); @@ -287,10 +299,12 @@ public class StreamingContainerAgent // On the input side there is a unlikely scenario of partitions even for inline stream that is being // handled. Always specifying a stream codec configuration in case that scenario happens. -InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in); -StreamCodec streamCodecInfo = getStreamCodec(idInputPortMeta); -
[1/2] apex-core git commit: APEXCORE-593 apex cli get-app-package-info could not retrieve properties defined in properties.xml
Repository: apex-core Updated Branches: refs/heads/master 5f95ee0e9 -> b4a4e0517 APEXCORE-593 apex cli get-app-package-info could not retrieve properties defined in properties.xml Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/82f2761d Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/82f2761d Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/82f2761d Branch: refs/heads/master Commit: 82f2761dd5c31968e95d85b51a2103b7ebe2f293 Parents: 16d1bf6 Author: vikramAuthored: Fri Mar 24 10:58:20 2017 +0530 Committer: vikram Committed: Fri Mar 24 12:17:58 2017 +0530 -- .../datatorrent/stram/client/AppPackage.java| 4 ++ .../stram/client/AppPackageTest.java| 71 +--- .../java/com/example/mydtapp/Application.java | 6 +- .../java/com/example/mydtapp/Application2.java | 2 +- .../com/example/mydtapp/StdoutOperator.java | 12 .../java/com/example/mydtapp/TestModule.java| 57 .../com/example/mydtapp/TestModuleOperator.java | 48 + .../src/main/resources/META-INF/properties.xml | 8 +++ 8 files changed, 195 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/82f2761d/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java index fd95649..238b646 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java +++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java @@ -380,6 +380,10 @@ public class AppPackage extends JarFile Configuration config = new Configuration(); +for (Map.Entry entry : defaultProperties.entrySet()) { + config.set(entry.getKey(), entry.getValue().getValue()); +} + List absClassPath = new ArrayList<>(classPath); for (int i = 0; i < absClassPath.size(); i++) { String path = absClassPath.get(i); http://git-wip-us.apache.org/repos/asf/apex-core/blob/82f2761d/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java -- diff --git a/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java b/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java index aae3913..20550a9 100644 --- a/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java +++ b/engine/src/test/java/com/datatorrent/stram/client/AppPackageTest.java @@ -27,6 +27,7 @@ import java.util.Set; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.junit.AfterClass; @@ -95,22 +96,57 @@ public class AppPackageTest Assert.assertEquals(System.getProperty("apex.version", "3.4.0"), json.getString("dtEngineVersion")); Assert.assertEquals("lib/*.jar", json.getJSONArray("classPath").getString(0)); -JSONObject application1 = json.getJSONArray("applications").getJSONObject(0); -JSONObject application2 = json.getJSONArray("applications").getJSONObject(1); +// Test if there are fixed number of applications +Assert.assertEquals("Number of applications", 2, json.getJSONArray("applications").length()); +JSONArray applications = json.getJSONArray("applications"); Map apps = new HashMap<>(); -apps.put(application1.getString("name"), application1); -apps.put(application2.getString("name"), application2); +for (int i = 0; i < applications.length(); i++) { + JSONObject application = applications.getJSONObject(i); + apps.put(application.getString("name"), application); +} + +// Retrieve applications found +JSONObject myFirstApplication = apps.get("MyFirstApplication"); +JSONObject mySecondApplication = apps.get("MySecondApplication"); + +// Testing if expected applications are found +Assert.assertNotNull("MyFirstApplication not found", myFirstApplication); +Assert.assertNotNull("MySecondApplication not found", mySecondApplication); + +// Tests related to MyFirstApplication start +Assert.assertEquals("mydtapp-1.0-SNAPSHOT.jar", myFirstApplication.getString("file")); +String errorStackTrace = myFirstApplication.getString("errorStackTrace"); +Assert.assertEquals("ErrorStackTrace", "null", errorStackTrace); + +JSONObject dag = myFirstApplication.getJSONObject("dag"); +
[2/2] apex-core git commit: Merge branch 'APEXCORE-593' of http://github.com/patilvikram/apex-core into APEXCORE-593
Merge branch 'APEXCORE-593' of http://github.com/patilvikram/apex-core into APEXCORE-593 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/b4a4e051 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/b4a4e051 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/b4a4e051 Branch: refs/heads/master Commit: b4a4e05170980547d791ff0d8fd42c2f00e2306a Parents: 5f95ee0 82f2761 Author: Vlad RozovAuthored: Fri Mar 24 12:57:42 2017 -0700 Committer: Vlad Rozov Committed: Fri Mar 24 12:57:42 2017 -0700 -- .../datatorrent/stram/client/AppPackage.java| 4 ++ .../stram/client/AppPackageTest.java| 71 +--- .../java/com/example/mydtapp/Application.java | 6 +- .../java/com/example/mydtapp/Application2.java | 2 +- .../com/example/mydtapp/StdoutOperator.java | 12 .../java/com/example/mydtapp/TestModule.java| 57 .../com/example/mydtapp/TestModuleOperator.java | 48 + .../src/main/resources/META-INF/properties.xml | 8 +++ 8 files changed, 195 insertions(+), 13 deletions(-) --
apex-core git commit: APEXCORE-662 Raising StramEvent in case of heartbeat miss
Repository: apex-core Updated Branches: refs/heads/master df8bc7e00 -> 16d1bf62d APEXCORE-662 Raising StramEvent in case of heartbeat miss Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/16d1bf62 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/16d1bf62 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/16d1bf62 Branch: refs/heads/master Commit: 16d1bf62d7a4c83aec1c3bdb9a8e5878fae42323 Parents: df8bc7e Author: Hitesh-ScorpioAuthored: Tue Mar 14 16:31:43 2017 +0530 Committer: Hitesh-Scorpio Committed: Thu Mar 23 22:06:53 2017 +0530 -- .../datatorrent/stram/StreamingContainerManager.java| 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/16d1bf62/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 9b0c4f4..ee07af1 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -780,14 +780,22 @@ public class StreamingContainerManager implements PlanContext //LOG.debug("{} {} {}", c.getExternalId(), currentTms - sca.createdMillis, this.vars.heartbeatTimeoutMillis); // container allocated but process was either not launched or is not able to phone home if (currentTms - sca.createdMillis > 2 * this.vars.heartbeatTimeoutMillis) { -LOG.info("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis); +LOG.error("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis); containerStopRequests.put(c.getExternalId(), c.getExternalId()); } } else { if (currentTms - sca.lastHeartbeatMillis > this.vars.heartbeatTimeoutMillis) { if (!isApplicationIdle()) { + // Check if the heartbeat for this agent has already been missed to raise the StramEvent only once + if (sca.lastHeartbeatMillis != -1) { +String info = String.format("Container %s@%s heartbeat timeout (%d%n ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis); +LOG.error(info); +StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), info, null); +stramEvent.setReason(info); +recordEventAsync(stramEvent); +sca.lastHeartbeatMillis = -1; + } // request stop (kill) as process may still be hanging around (would have been detected by Yarn otherwise) - LOG.info("Container {}@{} heartbeat timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis); containerStopRequests.put(c.getExternalId(), c.getExternalId()); } }
apex-core git commit: APEXCORE-676 Show description for DefaultProperties in get-app-package-info command only when user requests it by providing -withDescription flag
Repository: apex-core Updated Branches: refs/heads/master 3c063a441 -> df8bc7e00 APEXCORE-676 Show description for DefaultProperties in get-app-package-info command only when user requests it by providing -withDescription flag Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/df8bc7e0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/df8bc7e0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/df8bc7e0 Branch: refs/heads/master Commit: df8bc7e001041808a52a194375c3fcd1ae45f3b8 Parents: 3c063a4 Author: ajaygit158Authored: Thu Mar 23 03:25:55 2017 +0530 Committer: ajaygit158 Committed: Thu Mar 23 12:10:32 2017 +0530 -- .../java/com/datatorrent/stram/cli/ApexCli.java | 33 ++-- .../datatorrent/stram/client/AppPackage.java| 29 + .../stram/client/AppPackageTest.java| 18 +++ 3 files changed, 77 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/df8bc7e0/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index e95a391..dfaae97 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -672,10 +672,11 @@ public class ApexCli null, new Arg[]{new FileArg("parameter-name")}, "Get the configuration parameter")); -globalCommands.put("get-app-package-info", new CommandSpec(new GetAppPackageInfoCommand(), +globalCommands.put("get-app-package-info", new OptionsCommandSpec(new GetAppPackageInfoCommand(), new Arg[]{new FileArg("app-package-file")}, -null, -"Get info on the app package file")); +new Arg[]{new Arg("-withDescription")}, +"Get info on the app package file", +GET_APP_PACKAGE_INFO_OPTIONS)); globalCommands.put("get-app-package-operators", new OptionsCommandSpec(new GetAppPackageOperatorsCommand(), new Arg[]{new FileArg("app-package-file")}, new Arg[]{new Arg("search-term")}, @@ -2996,6 +2997,13 @@ public class ApexCli return tmpDir; } + private static Options GET_APP_PACKAGE_INFO_OPTIONS = new Options(); + + static { +GET_APP_PACKAGE_INFO_OPTIONS +.addOption(new Option("withDescription", false, "Get default properties with description")); + } + public static class GetOperatorClassesCommandLineOptions { final Options options = new Options(); @@ -3011,6 +3019,20 @@ public class ApexCli private static GetOperatorClassesCommandLineOptions GET_OPERATOR_CLASSES_OPTIONS = new GetOperatorClassesCommandLineOptions(); + static class GetAppPackageInfoCommandLineInfo + { +boolean provideDescription; + } + + static GetAppPackageInfoCommandLineInfo getGetAppPackageInfoCommandLineInfo(String[] args) throws ParseException + { +CommandLineParser parser = new PosixParser(); +GetAppPackageInfoCommandLineInfo result = new GetAppPackageInfoCommandLineInfo(); +CommandLine line = parser.parse(GET_APP_PACKAGE_INFO_OPTIONS, args); +result.provideDescription = line.hasOption("withDescription"); +return result; + } + static class GetOperatorClassesCommandLineInfo { String parent; @@ -3474,8 +3496,13 @@ public class ApexCli @Override public void execute(String[] args, ConsoleReader reader) throws Exception { + String[] tmpArgs = new String[args.length - 2]; + System.arraycopy(args, 2, tmpArgs, 0, args.length - 2); + GetAppPackageInfoCommandLineInfo commandLineInfo = getGetAppPackageInfoCommandLineInfo(tmpArgs); try (AppPackage ap = newAppPackageInstance(new File(expandFileName(args[1], true { JSONSerializationProvider jomp = new JSONSerializationProvider(); +jomp.addSerializer(PropertyInfo.class, +new AppPackage.PropertyInfoSerializer(commandLineInfo.provideDescription)); JSONObject apInfo = new JSONObject(jomp.getContext(null).writeValueAsString(ap)); apInfo.remove("name"); printJson(apInfo); http://git-wip-us.apache.org/repos/asf/apex-core/blob/df8bc7e0/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java index fcb1612..fd95649 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java +++
[2/2] apex-core git commit: APEXCORE-611 Changed default log level for StopContainer event from INFO to WARN
APEXCORE-611 Changed default log level for StopContainer event from INFO to WARN Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3c063a44 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3c063a44 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3c063a44 Branch: refs/heads/master Commit: 3c063a44177eaf44fec70aa618e361266d357a86 Parents: 9f2dfaa Author: ajaygit158Authored: Tue Mar 21 12:13:22 2017 +0530 Committer: Vlad Rozov Committed: Tue Mar 21 08:25:58 2017 -0700 -- engine/src/main/java/com/datatorrent/stram/api/StramEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/3c063a44/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java index d80fa94..8af90bc 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java +++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java @@ -451,7 +451,7 @@ public abstract class StramEvent public StopContainerEvent(String containerId, int exitStatus) { - this(containerId, exitStatus, LogLevel.INFO); + this(containerId, exitStatus, LogLevel.WARN); } public StopContainerEvent(String containerId, int exitStatus, LogLevel logLevel)
apex-core git commit: APEXCORE-674 Change access specifier of DTConfiguration.ValueEntry to private
Repository: apex-core Updated Branches: refs/heads/master d6f17f23a -> 491e2e332 APEXCORE-674 Change access specifier of DTConfiguration.ValueEntry to private Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/491e2e33 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/491e2e33 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/491e2e33 Branch: refs/heads/master Commit: 491e2e332a223304209b7835a8ac762b112b7318 Parents: d6f17f2 Author: ajaygit158Authored: Sun Mar 19 15:48:50 2017 +0530 Committer: ajaygit158 Committed: Mon Mar 20 22:38:07 2017 +0530 -- .../stram/client/DTConfiguration.java | 21 +--- .../stram/client/DTConfigurationTest.java | 35 2 files changed, 44 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/491e2e33/engine/src/main/java/com/datatorrent/stram/client/DTConfiguration.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/client/DTConfiguration.java b/engine/src/main/java/com/datatorrent/stram/client/DTConfiguration.java index 1f19d71..45e7826 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/DTConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/client/DTConfiguration.java @@ -70,7 +70,7 @@ public class DTConfiguration implements Iterable > private final Map map = new LinkedHashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(DTConfiguration.class); - public static class ValueEntry + private static class ValueEntry { public String value; public boolean isFinal = false; @@ -275,36 +275,33 @@ public class DTConfiguration implements Iterable > map.remove(key); } - public ValueEntry setInternal(String key, String value) + public void setInternal(String key, String value) { -ValueEntry valueEntry; -if (map.containsKey(key)) { - valueEntry = map.get(key); +ValueEntry valueEntry = map.get(key); +if (valueEntry != null) { valueEntry.value = value; } else { valueEntry = new ValueEntry(); valueEntry.scope = isLocalKey(key) ? Scope.LOCAL : Scope.TRANSIENT; + valueEntry.value = value; map.put(key, valueEntry); } -return valueEntry; } - public ValueEntry set(String key, String value, Scope scope, String description) throws ConfigException + public void set(String key, String value, Scope scope, String description) throws ConfigException { -ValueEntry valueEntry; -if (map.containsKey(key)) { - valueEntry = map.get(key); +ValueEntry valueEntry = map.get(key); +if (valueEntry != null) { if (valueEntry.isFinal) { throw new ConfigException("Cannot set final property " + key); } } else { valueEntry = new ValueEntry(); + map.put(key, valueEntry); } valueEntry.value = value; valueEntry.description = description; valueEntry.scope = isLocalKey(key) ? Scope.LOCAL : scope; -map.put(key, valueEntry); -return valueEntry; } public static boolean isLocalKey(String key) http://git-wip-us.apache.org/repos/asf/apex-core/blob/491e2e33/engine/src/test/java/com/datatorrent/stram/client/DTConfigurationTest.java -- diff --git a/engine/src/test/java/com/datatorrent/stram/client/DTConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/client/DTConfigurationTest.java new file mode 100644 index 000..c43c1f9 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/client/DTConfigurationTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.client; + +import org.junit.Assert; +import org.junit.Test; + +public class DTConfigurationTest +{ + @Test +
[1/2] apex-core git commit: APEXCORE-563: Add log filename and offset to the container/operator error events
Repository: apex-core Updated Branches: refs/heads/master 1e4785671 -> 8ce340ef3 APEXCORE-563: Add log filename and offset to the container/operator error events Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/764ca7b2 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/764ca7b2 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/764ca7b2 Branch: refs/heads/master Commit: 764ca7b285d5af4305b5c792cc1d9154da2f0c0d Parents: a6dd73b Author: Priyanka GugaleAuthored: Tue Dec 13 17:22:47 2016 +0530 Committer: priya Committed: Fri Mar 3 10:33:57 2017 +0530 -- .../datatorrent/stram/StramLocalCluster.java| 5 +- .../stram/StreamingContainerParent.java | 9 +- .../com/datatorrent/stram/api/StramEvent.java | 58 -- .../StreamingContainerUmbilicalProtocol.java| 5 +- .../stram/engine/StreamingContainer.java| 14 ++- .../com/datatorrent/stram/util/LoggerUtil.java | 77 + .../org/apache/apex/log/LogFileInformation.java | 45 .../datatorrent/stram/StramRecoveryTest.java| 6 +- .../stram/util/LogFileInformationTest.java | 111 +++ 9 files changed, 308 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 4d452af..7ebeea6 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.apex.log.LogFileInformation; + import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; @@ -105,7 +108,7 @@ public class StramLocalCluster implements Runnable, Controller } @Override -public void reportError(String containerId, int[] operators, String msg) throws IOException +public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException { log(containerId, msg); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java index e5d8a97..76f89bd 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java @@ -24,6 +24,8 @@ import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.log.LogFileInformation; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.ProtocolSignature; @@ -172,15 +174,16 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit } @Override - public void reportError(String containerId, int[] operators, String msg) throws IOException + public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException { if (operators == null || operators.length == 0) { - dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg)); + dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo)); } else { for (int operator : operators) { OperatorInfo operatorInfo = dagManager.getOperatorInfo(operator); if (operatorInfo != null) { - dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg)); + dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg, + logFileInfo)); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java index e9c5f13..d80fa94 100644 ---
[2/2] apex-core git commit: Merge branch 'APEXCORE-563-event-log-update' of http://github.com/DT-Priyanka/incubator-apex-core into APEXCORE-563
Merge branch 'APEXCORE-563-event-log-update' of http://github.com/DT-Priyanka/incubator-apex-core into APEXCORE-563 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8ce340ef Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8ce340ef Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8ce340ef Branch: refs/heads/master Commit: 8ce340ef3e363d188a58ae496d51cec170a58947 Parents: 1e47856 764ca7b Author: Vlad RozovAuthored: Fri Mar 3 08:33:27 2017 -0800 Committer: Vlad Rozov Committed: Fri Mar 3 08:33:27 2017 -0800 -- .../datatorrent/stram/StramLocalCluster.java| 5 +- .../stram/StreamingContainerParent.java | 9 +- .../com/datatorrent/stram/api/StramEvent.java | 58 -- .../StreamingContainerUmbilicalProtocol.java| 5 +- .../stram/engine/StreamingContainer.java| 14 ++- .../com/datatorrent/stram/util/LoggerUtil.java | 77 + .../org/apache/apex/log/LogFileInformation.java | 45 .../datatorrent/stram/StramRecoveryTest.java| 6 +- .../stram/util/LogFileInformationTest.java | 111 +++ 9 files changed, 308 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ce340ef/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ce340ef/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java --
apex-core git commit: APEXCORE-426 Reuse the running container, when the Stram restarts.
Repository: apex-core Updated Branches: refs/heads/master 58930cc57 -> 3b660c9c1 APEXCORE-426 Reuse the running container, when the Stram restarts. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3b660c9c Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3b660c9c Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3b660c9c Branch: refs/heads/master Commit: 3b660c9c1dd639c49ae300fd3f70397d3f794d12 Parents: 58930cc Author: Sandesh HegdeAuthored: Fri Nov 4 17:31:59 2016 -0700 Committer: Sandesh Hegde Committed: Thu Mar 2 13:04:33 2017 -0800 -- .../java/com/datatorrent/stram/StramClient.java | 2 + .../datatorrent/stram/StramLocalCluster.java| 2 +- .../stram/StreamingAppMasterService.java| 38 +++ .../stram/StreamingContainerManager.java| 16 .../com/datatorrent/stram/CheckpointTest.java | 39 +--- .../stram/StramLocalClusterTest.java| 2 +- .../datatorrent/stram/StramRecoveryTest.java| 2 +- .../stram/StreamingContainerManagerTest.java| 8 ++-- .../stram/plan/logical/DelayOperatorTest.java | 2 +- 9 files changed, 50 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StramClient.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 8b78c14..dad42e3 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -392,6 +392,8 @@ public class StramClient //appContext.setMaxAppAttempts(1); // no retries until Stram is HA } +appContext.setKeepContainersAcrossApplicationAttempts(true); + // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index e5d855b..ff61868 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -501,7 +501,7 @@ public class StramLocalCluster implements Runnable, Controller if (heartbeatMonitoringEnabled) { // monitor child containers -dnmgr.monitorHeartbeat(); +dnmgr.monitorHeartbeat(false); } if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 3898dbc..c0e09ab 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -27,7 +27,6 @@ import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -63,9 +62,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -75,7 +71,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.webapp.WebApp; @@
apex-core git commit: APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied.
Repository: apex-core Updated Branches: refs/heads/release-3.3 232eba368 -> 8b359fc58 APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8b359fc5 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8b359fc5 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8b359fc5 Branch: refs/heads/release-3.3 Commit: 8b359fc58188012f96b1f2b827987fe67c98d74c Parents: 232eba3 Author: Sanjay PujareAuthored: Fri Jan 27 10:35:09 2017 -0800 Committer: Vlad Rozov Committed: Fri Feb 24 17:31:32 2017 -0800 -- .../stram/StreamingAppMasterService.java| 16 1 file changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8b359fc5/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 3b2c4de..087d6d5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -770,6 +770,7 @@ public class StreamingAppMasterService extends CompositeService for (Map.Entry > entry : requestedResources.entrySet()) { if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); +LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); @@ -779,7 +780,7 @@ public class StreamingAppMasterService extends CompositeService } } - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ long currentTime = System.currentTimeMillis(); List blacklistRemovals = new ArrayList(); for (Iterator > it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) { @@ -797,7 +798,7 @@ public class StreamingAppMasterService extends CompositeService } numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}", amResp.getAMCommand()); @@ -836,7 +837,7 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority()); releasedContainers.add(allocatedContainer.getId()); numReleasedContainers++; - numRequestedContainers++; + numRequestedContainers--; continue; } if (csr != null) { @@ -964,7 +965,8 @@ public class StreamingAppMasterService extends CompositeService appDone = true; } - LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" + allocatedContainers.size()); + LOG.debug("Current application state: loop={}, appDone={}, total={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}", +loopCounter, appDone, numTotalContainers, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests); // monitor child containers dnmgr.monitorHeartbeat(); @@ -1038,16 +1040,14 @@ public class StreamingAppMasterService extends CompositeService private AllocateResponse sendContainerAskToRM(List containerRequests, List removedContainerRequests, List releasedContainers) throws YarnException, IOException { if
apex-core git commit: APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.
Repository: apex-core Updated Branches: refs/heads/release-3.4 2f34efd3f -> de967a4b5 APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/de967a4b Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/de967a4b Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/de967a4b Branch: refs/heads/release-3.4 Commit: de967a4b5b57a5876703fd7ef2d5b1bfcfffe3c0 Parents: 2f34efd Author: Sanjay PujareAuthored: Sat Feb 18 12:33:31 2017 -0800 Committer: Vlad Rozov Committed: Fri Feb 24 17:21:49 2017 -0800 -- .../stram/ResourceRequestHandler.java | 1 + .../stram/StreamingAppMasterService.java| 26 +--- 2 files changed, 13 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/de967a4b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java index 6ecc7c5..e760237 100644 --- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java @@ -81,6 +81,7 @@ public class ResourceRequestHandler */ if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); + LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); http://git-wip-us.apache.org/repos/asf/apex-core/blob/de967a4b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 1c7c893..88b64d3 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -699,7 +699,7 @@ public class StreamingAppMasterService extends CompositeService int loopCounter = -1; long nodeReportUpdateTime = 0; List releasedContainers = new ArrayList<>(); -int numTotalContainers = 0; + // keep track of already requested containers to not request them again while waiting for allocation int numRequestedContainers = 0; int numReleasedContainers = 0; @@ -723,7 +723,7 @@ public class StreamingAppMasterService extends CompositeService dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", ar.getApplicationId().toString(), ar.getName(), ar.getUser()); LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); -finishApplication(FinalApplicationStatus.FAILED, numTotalContainers); +finishApplication(FinalApplicationStatus.FAILED); return; } resourceRequestor.updateNodeReports(clientRMService.getNodeReports()); @@ -823,7 +823,7 @@ public class StreamingAppMasterService extends CompositeService resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests); - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ List blacklistRemovals = new ArrayList<>(); for (String hostname : failedBlackListedNodes) { Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime; @@ -838,8 +838,7 @@ public class StreamingAppMasterService extends CompositeService failedBlackListedNodes.removeAll(blacklistRemovals); } - numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}",
apex-core git commit: APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied.
Repository: apex-core Updated Branches: refs/heads/release-3.5 66bf590c8 -> bd8f7bade APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/bd8f7bad Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/bd8f7bad Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/bd8f7bad Branch: refs/heads/release-3.5 Commit: bd8f7bade65f03e7c7729da383a29cd424664f91 Parents: 66bf590 Author: Sanjay PujareAuthored: Sat Feb 18 12:33:31 2017 -0800 Committer: Vlad Rozov Committed: Fri Feb 24 17:15:58 2017 -0800 -- .../stram/ResourceRequestHandler.java | 1 + .../stram/StreamingAppMasterService.java| 26 +--- 2 files changed, 13 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java index c56f64f..e7f9672 100644 --- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java @@ -81,6 +81,7 @@ public class ResourceRequestHandler */ if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); + LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 15b6402..3898dbc 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -705,7 +705,7 @@ public class StreamingAppMasterService extends CompositeService int loopCounter = -1; long nodeReportUpdateTime = 0; List releasedContainers = new ArrayList<>(); -int numTotalContainers = 0; + // keep track of already requested containers to not request them again while waiting for allocation int numRequestedContainers = 0; int numReleasedContainers = 0; @@ -729,7 +729,7 @@ public class StreamingAppMasterService extends CompositeService dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", ar.getApplicationId().toString(), ar.getName(), ar.getUser()); LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); -finishApplication(FinalApplicationStatus.FAILED, numTotalContainers); +finishApplication(FinalApplicationStatus.FAILED); return; } resourceRequestor.updateNodeReports(clientRMService.getNodeReports()); @@ -829,7 +829,7 @@ public class StreamingAppMasterService extends CompositeService resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests); - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ List blacklistRemovals = new ArrayList<>(); for (String hostname : failedBlackListedNodes) { Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime; @@ -844,8 +844,7 @@ public class StreamingAppMasterService extends CompositeService failedBlackListedNodes.removeAll(blacklistRemovals); } - numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}",
[2/2] apex-core git commit: Merge branch 'APEXCORE-624.master.sanjay' of http://github.com/sanjaypujare/apex-core into APEXCORE-624
Merge branch 'APEXCORE-624.master.sanjay' of http://github.com/sanjaypujare/apex-core into APEXCORE-624 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a6dd73b9 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a6dd73b9 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a6dd73b9 Branch: refs/heads/master Commit: a6dd73b96b78f5c2509c025fca9fcc96e917f0c1 Parents: 911ccb2 de4c11f Author: Vlad RozovAuthored: Fri Feb 24 17:10:12 2017 -0800 Committer: Vlad Rozov Committed: Fri Feb 24 17:10:12 2017 -0800 -- .../stram/ResourceRequestHandler.java | 1 + .../stram/StreamingAppMasterService.java| 26 +--- 2 files changed, 13 insertions(+), 14 deletions(-) --
[1/2] apex-core git commit: APEXCORE-570 Back pressure implementation to suspend publisher when subscriber is slow and the buffer fills up
Repository: apex-core Updated Branches: refs/heads/master abc836ca1 -> d80501bdc APEXCORE-570 Back pressure implementation to suspend publisher when subscriber is slow and the buffer fills up Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f2d539d4 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f2d539d4 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f2d539d4 Branch: refs/heads/master Commit: f2d539d4d88e76c86ea23b803ac38c7a2584a18f Parents: 7ea7f60 Author: Pramod ImmaneniAuthored: Sat Jan 14 01:33:07 2017 -0800 Committer: Pramod Immaneni Committed: Sat Feb 18 07:24:46 2017 +0530 -- .../bufferserver/internal/DataList.java | 52 +--- .../bufferserver/internal/FastDataList.java | 4 +- .../datatorrent/bufferserver/server/Server.java | 10 ++-- .../bufferserver/storage/DiskStorageTest.java | 2 +- .../datatorrent/stram/StramLocalCluster.java| 2 +- 5 files changed, 55 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/f2d539d4/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java -- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 3f596d9..3a446b6 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -72,8 +72,9 @@ public class DataList private final AtomicInteger numberOfInMemBlockPermits; private MutableInt nextOffset = new MutableInt(); private Future future; + private final boolean backPressureEnabled; - public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks) + public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks, final boolean backPressureEnabled) { if (numberOfCacheBlocks < 1) { throw new IllegalArgumentException("Invalid number of Data List Memory blocks " + numberOfCacheBlocks); @@ -82,6 +83,7 @@ public class DataList numberOfInMemBlockPermits = new AtomicInteger(MAX_COUNT_OF_INMEM_BLOCKS - 1); this.identifier = identifier; this.blockSize = blockSize; +this.backPressureEnabled = backPressureEnabled; first = last = new Block(identifier, blockSize); } @@ -91,7 +93,7 @@ public class DataList * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block * at a time to the filesystem. We will use default value of 8 block sizes to be cached in memory */ -this(identifier, 64 * 1024 * 1024, 8); +this(identifier, 64 * 1024 * 1024, 8, true); } public int getBlockSize() @@ -172,6 +174,7 @@ public class DataList } } first = last; + first.prev = null; } numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1); } @@ -188,6 +191,7 @@ public class DataList if (temp.ending_window > windowId || temp == last) { if (prev != null) { first = temp; +first.prev = null; } first.purge(windowId); break; @@ -436,7 +440,8 @@ public class DataList logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits); } last.next = new Block(identifier, array, last.ending_window, last.ending_window); -last.release(false); +last.next.prev = last; +last.release(false, true); last = last.next; } @@ -554,6 +559,10 @@ public class DataList */ Block next; /** + * the previous in the chain + */ +Block prev; +/** * how count of references to this block. */ private final AtomicInteger refCount; @@ -822,10 +831,10 @@ public class DataList }; } -protected void release(boolean wait) +protected void release(boolean wait, boolean writer) { final int refCount = this.refCount.decrementAndGet(); - if (refCount == 0 && storage != null) { + if (canEvict(refCount, writer)) { assert (next != null); final Runnable storer = getStorer(data, readingOffset, writingOffset, storage); if (future != null && future.cancel(false)) { @@ -840,8 +849,37 @@ public class DataList } else { future = null; } + } +} + +private boolean canEvict(final int refCount, boolean writer) +{ + if (refCount == 0 && storage != null) { +if (backPressureEnabled) { + if (!writer) {
apex-core git commit: APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied.
Repository: apex-core Updated Branches: refs/heads/release-3.2 332810722 -> f84c035c2 APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f84c035c Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f84c035c Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f84c035c Branch: refs/heads/release-3.2 Commit: f84c035c205526a7412a228661a9c7d45c5b7f15 Parents: 3328107 Author: Sanjay PujareAuthored: Fri Jan 27 10:35:09 2017 -0800 Committer: Sanjay Pujare Committed: Wed Feb 15 15:00:29 2017 -0800 -- .../stram/StreamingAppMasterService.java| 16 1 file changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/f84c035c/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index d3f674a..10b78f2 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -770,6 +770,7 @@ public class StreamingAppMasterService extends CompositeService for (Map.Entry > entry : requestedResources.entrySet()) { if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); +LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); @@ -779,7 +780,7 @@ public class StreamingAppMasterService extends CompositeService } } - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ long currentTime = System.currentTimeMillis(); List blacklistRemovals = new ArrayList(); for (Iterator > it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) { @@ -797,7 +798,7 @@ public class StreamingAppMasterService extends CompositeService } numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}", amResp.getAMCommand()); @@ -836,7 +837,7 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority()); releasedContainers.add(allocatedContainer.getId()); numReleasedContainers++; - numRequestedContainers++; + numRequestedContainers--; continue; } if (csr != null) { @@ -964,7 +965,8 @@ public class StreamingAppMasterService extends CompositeService appDone = true; } - LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" + allocatedContainers.size()); + LOG.debug("Current application state: loop={}, appDone={}, total={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}", +loopCounter, appDone, numTotalContainers, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests); // monitor child containers dnmgr.monitorHeartbeat(); @@ -1038,16 +1040,14 @@ public class StreamingAppMasterService extends CompositeService private AllocateResponse sendContainerAskToRM(List containerRequests, List removedContainerRequests, List releasedContainers) throws YarnException, IOException { if
[2/2] apex-core git commit: Merge branch 'APEXCORE-644' of https://github.com/sgolovko/apex-core into APEXCORE-644
Merge branch 'APEXCORE-644' of https://github.com/sgolovko/apex-core into APEXCORE-644 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/74f732a7 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/74f732a7 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/74f732a7 Branch: refs/heads/master Commit: 74f732a79d7e271af0820b3071a9619be97ddf39 Parents: 4302929 d18dae0 Author: Vlad RozovAuthored: Sun Feb 12 21:39:05 2017 -0800 Committer: Vlad Rozov Committed: Sun Feb 12 21:39:05 2017 -0800 -- .../main/java/com/datatorrent/stram/cli/ApexCli.java | 6 +++--- .../java/com/datatorrent/stram/cli/ApexCliTest.java | 14 ++ 2 files changed, 17 insertions(+), 3 deletions(-) --
apex-core git commit: APEXCORE-627 : Unit test AtMostOnceTest intermittently fails
Repository: apex-core Updated Branches: refs/heads/master 1e9896bb4 -> 527c70bf8 APEXCORE-627 : Unit test AtMostOnceTest intermittently fails Fixed a race problem for calling checkpointed and committed in RecoverableInputOperator. The original implementation of the class used the method checkpointed() of the interface CheckpointListener to refresh a value of one of the criteria variables checkpointedWindowId. The fix update uses the method beforeCheckpoint() of the interface CheckpointNotificationListener. It guaranties that the update of the variable checkpointedWindowId will be done before the call of the method committed(). Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/527c70bf Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/527c70bf Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/527c70bf Branch: refs/heads/master Commit: 527c70bf8b4060d04d713f2b6e0ffb113a17ece6 Parents: 1e9896b Author: Sergey GolovkoAuthored: Fri Jan 27 15:15:09 2017 -0800 Committer: Sergey Golovko Committed: Mon Jan 30 16:20:14 2017 -0800 -- .../stram/engine/RecoverableInputOperator.java | 44 +--- 1 file changed, 30 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/527c70bf/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java -- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java index ed95874..6412da1 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java @@ -26,24 +26,28 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.common.util.BaseOperator; /** * */ -public class RecoverableInputOperator implements InputOperator, com.datatorrent.api.Operator.CheckpointListener +public class RecoverableInputOperator implements InputOperator, Operator.CheckpointNotificationListener { public final transient DefaultOutputPort output = new DefaultOutputPort<>(); private long checkpointedWindowId; - boolean firstRun = true; - transient boolean first; - transient long windowId; - int maximumTuples = 20; - boolean simulateFailure; + private transient boolean firstRun = true; + private transient boolean first; + private transient long windowId; + private int maximumTuples = 20; + private boolean simulateFailure; private static final Map idMap = new HashMap<>(); private static long tuple = 0; @@ -95,7 +99,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. public void setup(OperatorContext context) { firstRun = (checkpointedWindowId == 0); -logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId)); +logger.debug("{}", this); } @Override @@ -106,18 +110,12 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. @Override public void checkpointed(long windowId) { -if (checkpointedWindowId == 0) { - checkpointedWindowId = windowId; - logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId)); -} - -logger.debug("{} checkpointed at {}", this, Codec.getStringWindowId(windowId)); } @Override public void committed(long windowId) { -logger.debug("{} committed at {} firstRun {}, checkpointedWindowId {}", this, Codec.getStringWindowId(windowId), firstRun, Codec.getStringWindowId(checkpointedWindowId)); +logger.debug("{}, windowId={}", this, Codec.getStringWindowId(windowId)); if (simulateFailure && firstRun && checkpointedWindowId > 0 && windowId > checkpointedWindowId) { throw new RuntimeException("Failure Simulation from " + this); } @@ -134,4 +132,22 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. { simulateFailure = flag; } + + @Override + public void beforeCheckpoint(long windowId) + { +if (checkpointedWindowId == 0) { + checkpointedWindowId = windowId; +} +logger.debug("{},
apex-core git commit: APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()
Repository: apex-core Updated Branches: refs/heads/release-3.4 4fb1a5097 -> 2f34efd3f APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec() Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2f34efd3 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2f34efd3 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2f34efd3 Branch: refs/heads/release-3.4 Commit: 2f34efd3f8be4a8852471c61f0b596586bc2c51b Parents: 4fb1a50 Author: Vlad RozovAuthored: Wed Feb 1 08:03:45 2017 -0800 Committer: Vlad Rozov Committed: Wed Feb 1 11:12:46 2017 -0800 -- .../stram/StreamingContainerAgent.java | 39 +--- .../stram/StreamingContainerManager.java| 4 +- .../stram/plan/logical/LogicalPlan.java | 34 ++--- .../stram/plan/physical/StreamMapping.java | 7 ++-- .../com/datatorrent/stram/StreamCodecTest.java | 2 +- 5 files changed, 51 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/2f34efd3/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 598fea5..b4349f5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -200,7 +200,11 @@ public class StreamingContainerAgent if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { // input attributes of the downstream operator for (InputPortMeta sink : streamMeta.getSinks()) { -portInfo.contextAttributes = sink.getAttributes(); +try { + portInfo.contextAttributes = sink.getAttributes().clone(); +} catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); +} break; } } @@ -214,7 +218,7 @@ public class StreamingContainerAgent // Create mappings for all non-inline operators if (input.target.getContainer() != out.source.getContainer()) { InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input); - StreamCodec streamCodecInfo = getStreamCodec(inputPortMeta); + StreamCodec streamCodecInfo = inputPortMeta.getStreamCodec(); Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); if (!portInfo.streamCodecs.containsKey(id)) { portInfo.streamCodecs.put(id, streamCodecInfo); @@ -246,11 +250,19 @@ public class StreamingContainerAgent InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta); if (inputPortMeta != null) { - inputInfo.contextAttributes = inputPortMeta.getAttributes(); + try { +inputInfo.contextAttributes = inputPortMeta.getAttributes().clone(); + } catch (CloneNotSupportedException e) { +throw new RuntimeException("Cannot clone attributes", e); + } } if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { - inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes(); + try { +inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone(); + } catch (CloneNotSupportedException e) { +throw new RuntimeException("Cannot clone attributes", e); + } } inputInfo.sourceNodeId = sourceOutput.source.getId(); @@ -287,7 +299,7 @@ public class StreamingContainerAgent // On the input side there is a unlikely scenario of partitions even for inline stream that is being // handled. Always specifying a stream codec configuration in case that scenario happens. InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in); -StreamCodec streamCodecInfo = getStreamCodec(idInputPortMeta); +StreamCodec streamCodecInfo = idInputPortMeta.getStreamCodec(); Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); inputInfo.streamCodecs.put(id, streamCodecInfo); ndi.inputs.add(inputInfo); @@ -342,23 +354,6 @@ public class StreamingContainerAgent return operator; } - public static StreamCodec getStreamCodec(InputPortMeta inputPortMeta) - { -if (inputPortMeta != null) { - StreamCodec codec =
apex-core git commit: APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()
Repository: apex-core Updated Branches: refs/heads/release-3.3 8655ffabe -> 232eba368 APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec() Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/232eba36 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/232eba36 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/232eba36 Branch: refs/heads/release-3.3 Commit: 232eba3681f2a7bf43a7967bacd33f5f1f35b98f Parents: 8655ffa Author: Vlad RozovAuthored: Wed Feb 1 08:03:45 2017 -0800 Committer: Vlad Rozov Committed: Wed Feb 1 08:03:45 2017 -0800 -- .../stram/StreamingContainerAgent.java | 39 +--- .../stram/StreamingContainerManager.java| 4 +- .../stram/plan/logical/LogicalPlan.java | 30 +-- .../stram/plan/physical/StreamMapping.java | 7 ++-- .../com/datatorrent/stram/StreamCodecTest.java | 2 +- 5 files changed, 49 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/232eba36/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 81dc96e..a5fc3a5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -185,7 +185,11 @@ public class StreamingContainerAgent { if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { // input attributes of the downstream operator for (InputPortMeta sink : streamMeta.getSinks()) { -portInfo.contextAttributes = sink.getAttributes(); +try { + portInfo.contextAttributes = sink.getAttributes().clone(); +} catch (CloneNotSupportedException e) { + throw new RuntimeException("Cannot clone attributes", e); +} break; } } @@ -199,7 +203,7 @@ public class StreamingContainerAgent { // Create mappings for all non-inline operators if (input.target.getContainer() != out.source.getContainer()) { InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input); - StreamCodec streamCodecInfo = getStreamCodec(inputPortMeta); + StreamCodec streamCodecInfo = inputPortMeta.getStreamCodec(); Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); if (!portInfo.streamCodecs.containsKey(id)) { portInfo.streamCodecs.put(id, streamCodecInfo); @@ -231,11 +235,19 @@ public class StreamingContainerAgent { InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta); if (inputPortMeta != null) { - inputInfo.contextAttributes = inputPortMeta.getAttributes(); + try { +inputInfo.contextAttributes = inputPortMeta.getAttributes().clone(); + } catch (CloneNotSupportedException e) { +throw new RuntimeException("Cannot clone attributes", e); + } } if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) { - inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes(); + try { +inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone(); + } catch (CloneNotSupportedException e) { +throw new RuntimeException("Cannot clone attributes", e); + } } inputInfo.sourceNodeId = sourceOutput.source.getId(); @@ -272,7 +284,7 @@ public class StreamingContainerAgent { // On the input side there is a unlikely scenario of partitions even for inline stream that is being // handled. Always specifying a stream codec configuration in case that scenario happens. InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in); -StreamCodec streamCodecInfo = getStreamCodec(idInputPortMeta); +StreamCodec streamCodecInfo = idInputPortMeta.getStreamCodec(); Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo); inputInfo.streamCodecs.put(id, streamCodecInfo); ndi.inputs.add(inputInfo); @@ -327,23 +339,6 @@ public class StreamingContainerAgent { return operator; } - public static StreamCodec getStreamCodec(InputPortMeta inputPortMeta) - { -if (inputPortMeta != null) { - StreamCodec codec =
[2/2] apex-core git commit: Merge branch 'APEXCORE-598' of https://github.com/tweise/apex-core
Merge branch 'APEXCORE-598' of https://github.com/tweise/apex-core Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d9bc67d5 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d9bc67d5 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d9bc67d5 Branch: refs/heads/master Commit: d9bc67d5a62ca00d7ff1aea9a308fc8c6cf4be52 Parents: 51de67e f6f6d5f Author: Vlad RozovAuthored: Mon Jan 9 18:52:55 2017 -0800 Committer: Vlad Rozov Committed: Mon Jan 9 18:52:55 2017 -0800 -- .../datatorrent/stram/StramLocalCluster.java| 48 +--- .../stram/StramLocalClusterTest.java| 17 +++ 2 files changed, 48 insertions(+), 17 deletions(-) --
[1/2] apex-core git commit: APEXCORE-471 Reissue of the resource was failing for the BlackListBased scheduler, it happened because RequestedResource was never empty. Issue is fixed after making the op
Repository: apex-core Updated Branches: refs/heads/master 05c798d5c -> 51de67e61 APEXCORE-471 Reissue of the resource was failing for the BlackListBased scheduler, it happened because RequestedResource was never empty. Issue is fixed after making the operation similar to that of ResourceRequestHandler with the handling for Blacklist Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/893551b0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/893551b0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/893551b0 Branch: refs/heads/master Commit: 893551b02a5ebe86abbb9766f1d6d87aa2e3 Parents: 3f06ce7 Author: Sandesh HegdeAuthored: Sat Dec 24 21:25:31 2016 -1000 Committer: Sandesh Hegde Committed: Thu Jan 5 12:01:00 2017 -0800 -- .../stram/BlacklistBasedResourceRequestHandler.java | 16 +--- 1 file changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/893551b0/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java index 412f535..53d91a5 100644 --- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java @@ -54,8 +54,13 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler @Override public void reissueContainerRequests(AMRMClient amRmClient, Map > requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List containerRequests, List removedContainerRequests) { +if (!requestedResources.isEmpty()) { + // Check if any requests timed out, create new requests in that case + recreateContainerRequest(requestedResources, loopCounter, resourceRequestor, removedContainerRequests); +} + // Issue all host specific requests first -if (!hostSpecificRequestsMap.isEmpty() && requestedResources.isEmpty()) { +if (!hostSpecificRequestsMap.isEmpty()) { LOG.info("Issue Host specific requests first"); // Blacklist all the nodes and issue request for host specific Entry set = hostSpecificRequestsMap.entrySet().iterator().next(); @@ -74,10 +79,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler hostSpecificRequests.remove(cr); } hostSpecificRequestsMap.remove(set.getKey()); -} else if (!requestedResources.isEmpty()) { - // Check if any requests timed out, create new requests in that case - recreateContainerRequest(requestedResources, loopCounter, resourceRequestor, removedContainerRequests); -} else { +} else { if (blacklistedNodesForHostSpecificRequests != null) { // Remove the blacklisted nodes during host specific requests LOG.debug("All requests done.. Removing nodes from blacklist {}", blacklistedNodesForHostSpecificRequests); @@ -98,7 +100,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler } } - public void recreateContainerRequest(Map > requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List removedContainerRequests) + private void recreateContainerRequest(Map > requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List removedContainerRequests) { for (Map.Entry > entry : requestedResources.entrySet()) { if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { @@ -126,7 +128,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler } } - public void addHostSpecificRequest(StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr) + private void addHostSpecificRequest(StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr) { String hostKey = StringUtils.join(cr.getNodes(), ":"); List requests;
[2/2] apex-core git commit: Merge branch 'APEXCORE-585' of https://github.com/davidyan74/apex-core into APEXCORE-585
Merge branch 'APEXCORE-585' of https://github.com/davidyan74/apex-core into APEXCORE-585 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/93f790aa Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/93f790aa Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/93f790aa Branch: refs/heads/master Commit: 93f790aa7272a729ddcb2c296a280a2a9c95b88b Parents: b5ce049 5301fbf Author: Vlad RozovAuthored: Wed Dec 7 15:03:45 2016 -0800 Committer: Vlad Rozov Committed: Wed Dec 7 15:03:45 2016 -0800 -- .../stram/StreamingContainerManager.java| 6 ++-- .../java/com/datatorrent/stram/LatencyTest.java | 9 +- .../stram/StreamingContainerManagerTest.java| 32 +++- 3 files changed, 15 insertions(+), 32 deletions(-) --
apex-core git commit: APEXCORE-405 Make getDAG/prepareDAG available through EmbeddedAppLauncher.
Repository: apex-core Updated Branches: refs/heads/master c97dd7ccc -> d514859a8 APEXCORE-405 Make getDAG/prepareDAG available through EmbeddedAppLauncher. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d514859a Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d514859a Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d514859a Branch: refs/heads/master Commit: d514859a8d51464a8a849286444ea9863ea7077f Parents: c97dd7c Author: Thomas WeiseAuthored: Sun Dec 4 22:03:25 2016 -0800 Committer: Thomas Weise Committed: Mon Dec 5 13:51:15 2016 -0800 -- .../java/com/datatorrent/api/LocalMode.java | 37 .../apache/apex/api/EmbeddedAppLauncher.java| 36 +++ 2 files changed, 36 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d514859a/api/src/main/java/com/datatorrent/api/LocalMode.java -- diff --git a/api/src/main/java/com/datatorrent/api/LocalMode.java b/api/src/main/java/com/datatorrent/api/LocalMode.java index 7d6f1ee..0387506 100644 --- a/api/src/main/java/com/datatorrent/api/LocalMode.java +++ b/api/src/main/java/com/datatorrent/api/LocalMode.java @@ -19,7 +19,6 @@ package com.datatorrent.api; import org.apache.apex.api.EmbeddedAppLauncher; -import org.apache.hadoop.conf.Configuration; /** * Local mode execution for single application @@ -30,42 +29,6 @@ import org.apache.hadoop.conf.Configuration; @Deprecated public abstract class LocalMode extends EmbeddedAppLauncher { - - /** - * - * getDAG. - * - * @return - */ - public abstract DAG getDAG(); - - /** - * - * cloneDAG. - * - * @return - * @throws java.lang.Exception - */ - public abstract DAG cloneDAG() throws Exception; - - /** - * Build the logical plan through the given streaming application instance and/or from configuration. - * - * The plan will be constructed through {@link StreamingApplication#populateDAG}. If configuration properties are - * specified, they function as override, as would be the case when launching an application through CLI. - * - * This method can also be used to construct the plan declaratively from configuration only, by passing null for the - * application. In this case the configuration contains all operators and streams. - * - * - * @param app - * @param conf - * @return - * @throws Exception - * @since 0.3.5 - */ - public abstract DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception; - /** * * getController. http://git-wip-us.apache.org/repos/asf/apex-core/blob/d514859a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java -- diff --git a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java index 4ff705b..860926c 100644 --- a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java +++ b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java @@ -21,6 +21,7 @@ package org.apache.apex.api; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Attribute; +import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; /** @@ -65,6 +66,41 @@ public abstract class EmbeddedAppLauncher + * getDAG. + * + * @return + */ + public abstract DAG getDAG(); + + /** + * + * cloneDAG. + * + * @return + * @throws java.lang.Exception + */ + public abstract DAG cloneDAG() throws Exception; + + /** + * Build the logical plan through the given streaming application instance and/or from configuration. + * + * The plan will be constructed through {@link StreamingApplication#populateDAG}. If configuration properties are + * specified, they function as override, as would be the case when launching an application through CLI. + * + * This method can also be used to construct the plan declaratively from configuration only, by passing null for the + * application. In this case the configuration contains all operators and streams. + * + * + * @param app + * @param conf + * @return + * @throws Exception + * @since 0.3.5 + */ + public abstract DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception; + + /** * Shortcut to run an application. Used for testing. * * @param app
apex-core git commit: APEXCORE-525 return correct instance object from DefaultStatefulStreamCodec.newInstance
Repository: apex-core Updated Branches: refs/heads/master 891ed3ae9 -> c97dd7ccc APEXCORE-525 return correct instance object from DefaultStatefulStreamCodec.newInstance Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/c97dd7cc Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/c97dd7cc Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/c97dd7cc Branch: refs/heads/master Commit: c97dd7ccc495e7ac17e263ea82338a06987aa435 Parents: 891ed3a Author: Tushar R. GosaviAuthored: Thu Nov 24 22:46:28 2016 +0530 Committer: Vlad Rozov Committed: Mon Nov 28 21:40:31 2016 -0800 -- .../stram/codec/DefaultStatefulStreamCodec.java| 6 +- .../stram/codec/DefaultStatefulStreamCodecTest.java| 13 + 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/c97dd7cc/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java b/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java index 20460de..094c2e2 100644 --- a/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java +++ b/engine/src/main/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodec.java @@ -232,7 +232,11 @@ public class DefaultStatefulStreamCodec extends Kryo implements StatefulStrea @Override public DefaultStatefulStreamCodec newInstance() { -return new DefaultStatefulStreamCodec<>(); +try { + return getClass().newInstance(); +} catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Unable to create new stateful streamcodec object", e); +} } private static final Logger logger = LoggerFactory.getLogger(DefaultStatefulStreamCodec.class); http://git-wip-us.apache.org/repos/asf/apex-core/blob/c97dd7cc/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java -- diff --git a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java index c7a7aa0..42ac5dc 100644 --- a/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/codec/DefaultStatefulStreamCodecTest.java @@ -214,4 +214,17 @@ public class DefaultStatefulStreamCodecTest } } + static class NoNewInstanceStatefulStreamCodec extends DefaultStatefulStreamCodec + { + + } + + @Test + public void testNewInstanceMethod() + { +NoNewInstanceStatefulStreamCodec codec = new NoNewInstanceStatefulStreamCodec(); +DefaultStatefulStreamCodec newCodec = codec.newInstance(); +Assert.assertNotEquals("Codec and newCodec are not same ", codec, newCodec); +Assert.assertEquals("Class of codec and newCodec is same ", newCodec.getClass(), codec.getClass()); + } }
[apex-core] Git Push Summary [forced push!] [Forced Update!]
Repository: apex-core Updated Branches: refs/heads/master 4a1570df9 -> 891ed3ae9 (forced update)
apex-core git commit: APEXCORE-505 Heartbeat loop was blocked waiting for operator activation, reason for this is that Stream activation(Only BufferServerSubscriber and WindowGenerator) waits for oper
Repository: apex-core Updated Branches: refs/heads/master 2c024cd84 -> fc3246e11 APEXCORE-505 Heartbeat loop was blocked waiting for operator activation, reason for this is that Stream activation(Only BufferServerSubscriber and WindowGenerator) waits for operator activation in heartbeat thread. After analysis and sanity testing, we don't see the need to have the synchronization between operator and stream activation Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/fc3246e1 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/fc3246e1 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/fc3246e1 Branch: refs/heads/master Commit: fc3246e11afa426eefe1dcaf5b8aface079d7d10 Parents: 2c024cd Author: Sandesh HegdeAuthored: Fri Nov 4 10:53:03 2016 -0700 Committer: Sandesh Hegde Committed: Wed Nov 9 16:00:38 2016 -0800 -- .../stram/engine/StreamingContainer.java| 35 + .../stram/engine/GenericNodeTest.java | 132 +++ .../stram/stream/SocketStreamTest.java | 101 ++ 3 files changed, 213 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/fc3246e1/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 8e7e0a1..78f3421 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -38,7 +38,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1361,14 +1360,11 @@ public class StreamingContainer extends YarnContainerMain public synchronized void activate(final Map nodeMap, Map > newStreams) { for (ComponentContextPair pair : newStreams.values()) { - if (!(pair.component instanceof BufferServerSubscriber)) { -activeStreams.put(pair.component, pair.context); -pair.component.activate(pair.context); -eventBus.publish(new StreamActivationEvent(pair)); - } + activeStreams.put(pair.component, pair.context); + pair.component.activate(pair.context); + eventBus.publish(new StreamActivationEvent(pair)); } -final CountDownLatch signal = new CountDownLatch(nodeMap.size()); for (final OperatorDeployInfo ndi : nodeMap.values()) { /* * OiO nodes get activated with their primary nodes. @@ -1408,10 +1404,6 @@ public class StreamingContainer extends YarnContainerMain currentdi = null; -for (int i = setOperators.size(); i-- > 0; ) { - signal.countDown(); -} - node.run(); /* this is a blocking call */ } catch (Error error) { int[] operators; @@ -1448,8 +1440,6 @@ public class StreamingContainer extends YarnContainerMain failedNodes.add(ndi.id); logger.error("Shutdown of operator {} failed due to an exception.", ndi, ex); } -} else { - signal.countDown(); } List oioNodeIdList = oioGroups.get(ndi.id); @@ -1463,8 +1453,6 @@ public class StreamingContainer extends YarnContainerMain failedNodes.add(oiodi.id); logger.error("Shutdown of operator {} failed due to an exception.", oiodi, ex); } -} else { - signal.countDown(); } } } @@ -1475,23 +1463,6 @@ public class StreamingContainer extends YarnContainerMain thread.start(); } -/** - * we need to make sure that before any of the operators gets the first message, it's activated. - */ -try { - signal.await(); -} catch (InterruptedException ex) { - logger.debug("Activation of operators interrupted.", ex); -} - -for (ComponentContextPair pair : newStreams.values()) { - if (pair.component instanceof BufferServerSubscriber) { -activeStreams.put(pair.component, pair.context); -pair.component.activate(pair.context); -eventBus.publish(new StreamActivationEvent(pair)); - } -} - for (WindowGenerator wg : generators.values())
apex-core git commit: Documentation for CLI support for web service authentication for Kerberos SPNEGO, BASIC and DIGEST mechanisms
Repository: apex-core Updated Branches: refs/heads/master 0bdf771f8 -> a490ee04d Documentation for CLI support for web service authentication for Kerberos SPNEGO, BASIC and DIGEST mechanisms Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a490ee04 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a490ee04 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a490ee04 Branch: refs/heads/master Commit: a490ee04d028a4d8a6285ab75e13d663b1d671b7 Parents: 0bdf771 Author: Pramod ImmaneniAuthored: Wed Oct 5 13:55:56 2016 -0700 Committer: Pramod Immaneni Committed: Wed Oct 5 13:55:56 2016 -0700 -- docs/security.md | 40 ++-- 1 file changed, 38 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/a490ee04/docs/security.md -- diff --git a/docs/security.md b/docs/security.md index fb4a486..6b1b8b6 100644 --- a/docs/security.md +++ b/docs/security.md @@ -15,7 +15,7 @@ The Apex command line interface (CLI) program, `apex`, is used to launch applica ###CLI Configuration - Â When Kerberos security is enabled in Hadoop, a Kerberos ticket granting ticket (TGT) or the Kerberos credentials of the user are needed by the CLI program `apex` to authenticate with Hadoop for any operation. Kerberos credentials are composed of a principal and either a _keytab_ or a password. For security and operational reasons only keytabs are supported in Hadoop and by extension in Apex platform. When user credentials are specified, all operations including launching application are performed as that user. +When Kerberos security is enabled in Hadoop, a Kerberos ticket granting ticket (TGT) or the Kerberos credentials of the user are needed by the CLI program `apex` to authenticate with Hadoop for any operation. Kerberos credentials are composed of a principal and either a _keytab_ or a password. For security and operational reasons only keytabs are supported in Hadoop and by extension in Apex platform. When user credentials are specified, all operations including launching application are performed as that user. Using kinit @@ -49,7 +49,7 @@ The property `dt.authentication.principal` specifies the Kerberos user principal ### Web Services security -Alongside every Apex application is an application master process running called Streaming Container Manager (STRAM). STRAM manages the application by handling the various control aspects of the application such as orchestrating the execution of the application on the cluster, playing a key role in scalability and fault tolerance, providing application insight by collecting statistics among other functionality. +Alongside every Apex application, there is an application master process called Streaming Container Manager (STRAM) running. STRAM manages the application by handling the various control aspects of the application such as orchestrating the execution of the application on the cluster, playing a key role in scalability and fault tolerance, providing application insight by collecting statistics among other functionality. STRAM provides a web service interface to introspect the state of the application and its various components and to make dynamic changes to the applications. Some examples of supported functionality are getting resource usage and partition information of various operators, getting operator statistics and changing properties of running operators. @@ -75,6 +75,42 @@ The security option value can be `ENABLED`, `FOLLOW_HADOOP_AUTH`, `FOLLOW_HADOOP The subsequent sections talk about how security works in Apex. This information is not needed by users but is intended for the inquisitive techical audience who want to know how security works. + CLI setup + +The CLI program `apex` connects to the web service endpoint of the STRAM for a running application to query for information or to make changes to it. In order to do that, it has to first connect to the YARN proxy web service and get the necessary connection information and credentials to connect to STRAM. The proxy web service may have security enabled and in that case, the CLI program `apex` would first need to authenticate with the service before it can get any information. + +Hadoop allows a lot of flexibility in the kind of security to use for the proxy. It allows the user to plug-in their own authentication provider. The authentication provider is specified as a JAVA class name. It also comes bundled with a provider for Kerberos SPNEGO authentication. Some distributions also include a provider for BASIC authentication via SASL. + +The
apex-malhar git commit: japicmp 0.9.0
Repository: apex-malhar Updated Branches: refs/heads/master f5f1943d2 -> 139d89fd3 japicmp 0.9.0 Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/139d89fd Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/139d89fd Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/139d89fd Branch: refs/heads/master Commit: 139d89fd37fbdf504fa81dbffea6bdc3f1f1acee Parents: f5f1943 Author: Thomas WeiseAuthored: Wed Sep 28 08:16:02 2016 -0700 Committer: Thomas Weise Committed: Wed Sep 28 08:16:02 2016 -0700 -- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/139d89fd/pom.xml -- diff --git a/pom.xml b/pom.xml index beb22b4..f9ce982 100644 --- a/pom.xml +++ b/pom.xml @@ -110,7 +110,7 @@ com.github.siom79.japicmp japicmp-maven-plugin - 0.7.1 + 0.9.0
[5/6] apex-malhar git commit: Fix trailing whitespace.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java -- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java index a19417c..2bbb903 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java @@ -74,7 +74,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator transient Channel channel = null; transient String exchange = "testEx"; transient String queueName="testQ"; - + private WindowDataManager windowDataManager; private transient long currentWindowId; private transient long largestRecoveryWindowId; @@ -86,7 +86,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator @Override public void setup(OperatorContext context) { -// Needed to setup idempotency storage manager in setter +// Needed to setup idempotency storage manager in setter this.context = context; this.operatorContextId = context.getId(); @@ -104,11 +104,11 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator DTThrowable.rethrow(ex); } } - + @Override public void beginWindow(long windowId) { -currentWindowId = windowId; +currentWindowId = windowId; largestRecoveryWindowId = windowDataManager.getLargestCompletedWindow(); if (windowId <= largestRecoveryWindowId) { // Do not resend already sent tuples @@ -119,7 +119,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator skipProcessingTuple = false; } } - + /** * {@inheritDoc} */ @@ -158,11 +158,11 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator logger.debug(ex.toString()); } } - + public WindowDataManager getWindowDataManager() { return windowDataManager; } - + public void setWindowDataManager(WindowDataManager windowDataManager) { this.windowDataManager = windowDataManager; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java -- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java index 74ae181..1ddd9d4 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java @@ -38,7 +38,7 @@ import com.datatorrent.netlet.util.DTThrowable; public class RabbitMQOutputOperator extends AbstractSinglePortRabbitMQOutputOperator{ private static final Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperator.class); - + @Override public void processTuple(byte[] tuple) { @@ -46,6 +46,6 @@ public class RabbitMQOutputOperator extends AbstractSinglePortRabbitMQOutputOper channel.basicPublish(exchange, "", null, tuple); } catch (IOException e) { DTThrowable.rethrow(e); -} +} } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java -- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index 59b320d..0b12574 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -38,7 +38,7 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; /** * This is the base implementation of a Redis input operator. - * + * * @displayName Abstract Redis Input * @category Input * @tags redis, key value @@ -161,7 +161,7 @@ public abstract class AbstractRedisInputOperator extends AbstractKeyValueStor scanComplete = false; scanParameters = new ScanParams(); scanParameters.count(scanCount); - + // For the 1st window after checkpoint, windowID - 1 would not have recovery // offset stored in windowDataManager // But recoveryOffset is non-transient, so will be recovered with http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java
[6/6] apex-malhar git commit: Fix trailing whitespace.
Fix trailing whitespace. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/763d14fc Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/763d14fc Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/763d14fc Branch: refs/heads/master Commit: 763d14fca6b84fdda1b6853235e5d4b71ca87fca Parents: 90b5c9b Author: CI SupportAuthored: Mon Sep 26 20:36:22 2016 -0700 Committer: CI Support Committed: Mon Sep 26 20:36:22 2016 -0700 -- .../LogstreamWidgetOutputOperator.java | 2 +- .../benchmark/CouchBaseAppOutput.java | 2 +- ...nchmarkPartitionableKafkaOutputOperator.java | 2 +- .../state/ManagedStateBenchmarkApp.java | 2 +- .../benchmark/state/StoreOperator.java | 28 .../state/ManagedStateBenchmarkAppTester.java | 14 ++-- .../contrib/avro/AvroFileInputOperator.java | 6 +- .../datatorrent/contrib/avro/AvroToPojo.java| 6 +- .../datatorrent/contrib/avro/PojoToAvro.java| 10 +-- .../AbstractElasticSearchOutputOperator.java| 2 +- .../elasticsearch/ElasticSearchConnectable.java | 6 +- .../ElasticSearchMapInputOperator.java | 6 +- .../ElasticSearchMapOutputOperator.java | 10 +-- .../ElasticSearchPercolatorStore.java | 12 ++-- .../contrib/enrich/DelimitedFSLoader.java | 4 +- .../datatorrent/contrib/enrich/FSLoader.java| 2 +- .../contrib/enrich/FixedWidthFSLoader.java | 10 +-- .../contrib/formatter/CsvFormatter.java | 6 +- .../geode/AbstractGeodeInputOperator.java | 4 +- .../geode/AbstractGeodeOutputOperator.java | 4 +- .../contrib/geode/GeodeCheckpointStore.java | 18 +++--- .../geode/GeodeKeyValueStorageAgent.java| 2 +- .../contrib/geode/GeodePOJOOutputOperator.java | 2 +- .../datatorrent/contrib/geode/GeodeStore.java | 6 +- .../contrib/geode/RegionCreateFunction.java | 2 +- .../contrib/hbase/HBaseFieldInfo.java | 34 +- .../kafka/AbstractKafkaInputOperator.java | 6 +- .../contrib/kafka/HighlevelKafkaConsumer.java | 8 +-- .../contrib/kafka/KafkaPartition.java | 16 ++--- .../contrib/kafka/OffsetManager.java| 4 +- .../contrib/kafka/SimpleKafkaConsumer.java | 10 +-- .../kinesis/AbstractKinesisOutputOperator.java | 12 ++-- .../contrib/memcache/MemcacheStore.java | 2 +- .../contrib/mqtt/MqttClientConfig.java | 2 +- .../parquet/AbstractParquetFileReader.java | 6 +- .../contrib/parquet/ParquetFilePOJOReader.java | 10 +-- .../contrib/parser/CellProcessorBuilder.java| 34 +- .../datatorrent/contrib/parser/CsvParser.java | 12 ++-- .../contrib/parser/DelimitedSchema.java | 26 .../datatorrent/contrib/parser/JsonParser.java | 16 ++--- .../contrib/r/REngineConnectable.java | 8 +-- .../java/com/datatorrent/contrib/r/RScript.java | 8 +-- .../rabbitmq/AbstractRabbitMQInputOperator.java | 30 - .../AbstractRabbitMQOutputOperator.java | 14 ++-- .../rabbitmq/RabbitMQOutputOperator.java| 4 +- .../redis/AbstractRedisInputOperator.java | 4 +- .../redis/RedisKeyValueInputOperator.java | 2 +- .../redis/RedisMapAsValueInputOperator.java | 4 +- .../datatorrent/contrib/redis/RedisStore.java | 6 +- .../solr/AbstractSolrOutputOperator.java| 2 +- .../ConcurrentUpdateSolrServerConnector.java| 2 +- .../datatorrent/contrib/splunk/SplunkStore.java | 2 +- .../contrib/zmq/ZeroMQInputOperator.java| 2 +- .../apex/malhar/contrib/misc/math/Change.java | 4 +- .../contrib/misc/math/CompareExceptMap.java | 4 +- .../malhar/contrib/misc/math/ExceptMap.java | 2 +- .../apex/malhar/contrib/misc/math/Quotient.java | 2 +- .../malhar/contrib/misc/math/QuotientMap.java | 2 +- .../malhar/contrib/misc/math/SumCountMap.java | 6 +- .../contrib/parser/StreamingJsonParser.java | 8 +-- .../contrib/couchbase/CouchBaseSetTest.java | 2 +- .../ElasticSearchOperatorTest.java | 6 +- .../ElasticSearchPercolateTest.java | 10 +-- .../contrib/geode/GeodeCheckpointStoreTest.java | 2 +- .../hbase/HBasePOJOInputOperatorTest.java | 34 +- .../contrib/hbase/HBasePOJOPutOperatorTest.java | 36 +-- .../HBaseTransactionalPutOperatorTest.java | 12 ++-- .../datatorrent/contrib/hbase/HBaseUtil.java| 8 +-- .../contrib/helper/MessageQueueTestHelper.java | 2 +- .../KafkaExactlyOnceOutputOperatorTest.java | 20 +++--- .../contrib/kafka/KafkaTestPartitioner.java | 2 +- .../contrib/kafka/KafkaTestProducer.java| 4 +- .../kinesis/KinesisOperatorTestBase.java| 12 ++-- .../kinesis/KinesisOutputOperatorTest.java | 16 ++---
[3/6] apex-malhar git commit: Fix trailing whitespace.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java -- diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java index c8eeacc..bdf6fad 100644 --- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java +++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java @@ -54,7 +54,7 @@ import com.datatorrent.netlet.util.DTThrowable; * the Pojo Class. * dateFormats: Comma separated string of date formats e.g * dd/mm/,dd-mmm- where first one would be considered default - * + * * @displayName XmlParser * @category Parsers * @tags xml pojo parser http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java -- diff --git a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java index 8c22140..6c17529 100644 --- a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java +++ b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java @@ -59,7 +59,7 @@ import com.datatorrent.lib.util.PojoUtils; * - projected port emits POJOs with projected fields from input POJOs * - remainder port, if connected, emits POJOs with remainder fields from input POJOs * - error port emits input POJOs as is upon error situations - * + * * Examples * For {a, b, c} type of input tuples * - when selectFields = "" and dropFields = "", projected port shall emit {a, b, c} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java -- diff --git a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java index 9532180..d6589ee 100644 --- a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java +++ b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java @@ -31,8 +31,8 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.common.util.BaseOperator; /** - * A base implementation of a BaseOperator for language script operator. Subclasses should provide the - implementation of getting the bindings and process method. + * A base implementation of a BaseOperator for language script operator. Subclasses should provide the + implementation of getting the bindings and process method. * Interface for language script operator. * * @displayName Script @@ -55,13 +55,13 @@ public abstract class ScriptOperator extends BaseOperator } }; - + /** * Output outBindings port that emits a map of String, Object */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort
[4/6] apex-malhar git commit: Fix trailing whitespace.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java -- diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java index 8e12fcc..21079d7 100644 --- a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java +++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java @@ -30,7 +30,7 @@ public class CalculatorTest { @Test public void testSomeMethod() throws Exception - { + { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); conf.addResource("dt-site-pilibrary.xml"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java -- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java index 6360768..3c9c4da 100755 --- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java +++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java @@ -210,7 +210,7 @@ public abstract class AbstractFSRollingOutputOperator extends AbstractFileOut * written to. Example: If hive partitions are date='2014-12-12',country='USA' * then this method returns {"2014-12-12","USA"} The implementation is left to * the user. - * + * * @param tuple * A received tuple to be written to a hive partition. * @return ArrayList containing hive partition values. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java -- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java index 8e3b143..ed4ca85 100755 --- a/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java +++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java @@ -61,7 +61,7 @@ public class HiveOperator extends AbstractStoreOutputOperator getHivePartitionColumns() @@ -236,7 +236,7 @@ public class HiveOperator extends AbstractStoreOutputOperator hivePartitionColumns) @@ -246,7 +246,7 @@ public class HiveOperator extends AbstractStoreOutputOperatorhttp://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java -- diff --git a/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java index 3491b3c..d859634 100644 --- a/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java +++ b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java @@ -164,7 +164,7 @@ public class HiveOutputModule implements Module /** * The path of the directory to where files are written. - * + * * @return file path */ public String getFilePath() @@ -174,7 +174,7 @@ public class HiveOutputModule implements Module /** * The path of the directory to where files are written. - * + * * @param filePath * file path */ @@ -185,7 +185,7 @@ public class HiveOutputModule implements Module /** * Names of the columns in hive table (excluding partitioning columns). - * + * * @return Hive column names */ public String[] getHiveColumns() @@ -195,7 +195,7 @@ public class HiveOutputModule implements Module /** * Names of the columns in hive table (excluding partitioning columns). - * + * * @param hiveColumns * Hive column names */ @@ -207,7 +207,7 @@ public class HiveOutputModule implements Module /** * Data types of the columns in hive table (excluding partitioning columns). * This sequence should match to the fields in hiveColumnDataTypes - * + * * @return Hive column data types */ public FIELD_TYPE[] getHiveColumnDataTypes() @@ -218,7 +218,7 @@ public class HiveOutputModule implements Module /** * Data types of the columns in hive table (excluding partitioning columns). * This sequence should match to the fields in hiveColumnDataTypes * - * + * * @param hiveColumnDataTypes * Hive column data types */ @@ -230,7 +230,7 @@ public class HiveOutputModule implements Module /** * Expressions for the hive columns (excluding partitioning columns). This * sequence should match to the fields in hiveColumnDataTypes - * + * * @return
apex-core git commit: APEXCORE-540 Enforce exclusion of provided dependencies.
Repository: apex-core Updated Branches: refs/heads/master 25ca79f0b -> 6fdd73d6f APEXCORE-540 Enforce exclusion of provided dependencies. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/6fdd73d6 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/6fdd73d6 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/6fdd73d6 Branch: refs/heads/master Commit: 6fdd73d6f34c25c1ce091173f8def2b2a6768e4c Parents: 25ca79f Author: Thomas WeiseAuthored: Fri Sep 23 10:38:37 2016 -0700 Committer: Thomas Weise Committed: Fri Sep 23 10:38:37 2016 -0700 -- .../main/resources/archetype-resources/pom.xml | 27 ++-- 1 file changed, 25 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/6fdd73d6/apex-app-archetype/src/main/resources/archetype-resources/pom.xml -- diff --git a/apex-app-archetype/src/main/resources/archetype-resources/pom.xml b/apex-app-archetype/src/main/resources/archetype-resources/pom.xml index 0709a7b..d378b33 100644 --- a/apex-app-archetype/src/main/resources/archetype-resources/pom.xml +++ b/apex-app-archetype/src/main/resources/archetype-resources/pom.xml @@ -57,7 +57,30 @@ - + +org.apache.maven.plugins +maven-enforcer-plugin +1.4.1 + + +enforce-provided-dependencies + + enforce + + + + + +org.apache.hadoop:*:*:*:runtime +org.apache.apex:apex-*:*:*:runtime + + Found (transitive) dependencies that should be excluded or not scope "runtime" to remain excluded from the application package. See https://issues.apache.org/jira/browse/APEXCORE-540 + + + + + + maven-assembly-plugin @@ -237,7 +260,7 @@ org.apache.apex malhar-library - 3.4.0 + 3.5.0
apex-core git commit: APEXCORE-521 upate parent pom references.
Repository: apex-core Updated Branches: refs/heads/master 70ceae3db -> eacb7a4cf APEXCORE-521 upate parent pom references. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/eacb7a4c Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/eacb7a4c Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/eacb7a4c Branch: refs/heads/master Commit: eacb7a4cf4231cb7a4225c6a2b2f10d61c0a9863 Parents: 70ceae3 Author: Thomas WeiseAuthored: Fri Sep 2 11:25:25 2016 -0700 Committer: Thomas Weise Committed: Tue Sep 6 18:19:01 2016 -0700 -- pom.xml | 2 +- shaded-ning19/pom.xml | 7 --- 2 files changed, 5 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/eacb7a4c/pom.xml -- diff --git a/pom.xml b/pom.xml index a26d960..e48adc3 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ org.apache apache -16 +18 org.apache.apex http://git-wip-us.apache.org/repos/asf/apex-core/blob/eacb7a4c/shaded-ning19/pom.xml -- diff --git a/shaded-ning19/pom.xml b/shaded-ning19/pom.xml index 9a2a8d4..70b90b1 100644 --- a/shaded-ning19/pom.xml +++ b/shaded-ning19/pom.xml @@ -23,9 +23,10 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;> 4.0.0 -org.apache.apex -apex -3.3.0-incubating +org.apache +apache +18 + org.apache.apex apex-shaded-ning19
[1/2] apex-core git commit: APEXCORE-513 Reducing the log level for node reports
Repository: apex-core Updated Branches: refs/heads/master 3c503a91f -> 8b55fbe91 APEXCORE-513 Reducing the log level for node reports Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9e3940fd Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9e3940fd Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9e3940fd Branch: refs/heads/master Commit: 9e3940fdc1e6a75f9a092c531bffaa2ee9c562f2 Parents: 65a721f Author: sandeshhAuthored: Tue Aug 30 14:47:59 2016 -0700 Committer: sandeshh Committed: Tue Aug 30 20:46:40 2016 -0700 -- .../java/com/datatorrent/stram/ResourceRequestHandler.java | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/9e3940fd/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java index 6ecc7c5..c56f64f 100644 --- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java @@ -153,11 +153,9 @@ public class ResourceRequestHandler */ public void updateNodeReports(List nodeReports) { -// LOG.debug("Got {} updated node reports.", nodeReports.size()); for (NodeReport nr : nodeReports) { - StringBuilder sb = new StringBuilder(); - sb.append("rackName=").append(nr.getRackName()).append(",nodeid=").append(nr.getNodeId()).append(",numContainers=").append(nr.getNumContainers()).append(",capability=").append(nr.getCapability()).append("used=").append(nr.getUsed()).append("state=").append(nr.getNodeState()); - LOG.info("Node report: " + sb); + + LOG.debug("Node report: rackName={}, nodeid={}, numContainers={}, capability={}, used={}, state={}", nr.getRackName(), nr.getNodeId(), nr.getNumContainers(), nr.getCapability(), nr.getUsed(), nr.getNodeState()); nodeReportMap.put(nr.getNodeId().getHost(), nr); nodeToRack.put(nr.getNodeId().getHost(), nr.getRackName()); }
[2/2] apex-core git commit: Merge branch 'APEXCORE-513' of https://github.com/sandeshh/apex-core into APEXCORE-513
Merge branch 'APEXCORE-513' of https://github.com/sandeshh/apex-core into APEXCORE-513 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8b55fbe9 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8b55fbe9 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8b55fbe9 Branch: refs/heads/master Commit: 8b55fbe916994b2c0ab2f2d37f355eecf80d5110 Parents: 3c503a9 9e3940f Author: Vlad RozovAuthored: Fri Sep 2 15:55:10 2016 -0700 Committer: Vlad Rozov Committed: Fri Sep 2 15:55:10 2016 -0700 -- .../java/com/datatorrent/stram/ResourceRequestHandler.java | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) --
apex-core git commit: Fix trailing whitespace.
Repository: apex-core Updated Branches: refs/heads/master 5f06c7feb -> 3c503a91f Fix trailing whitespace. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3c503a91 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3c503a91 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3c503a91 Branch: refs/heads/master Commit: 3c503a91f07ac076d5168f270cf783ef63aff008 Parents: 5f06c7f Author: CI SupportAuthored: Thu Sep 1 19:27:39 2016 -0700 Committer: Thomas Weise Committed: Fri Sep 2 15:28:13 2016 -0700 -- .../com/datatorrent/api/AffinityRulesSet.java | 2 +- .../java/com/datatorrent/api/StorageAgent.java | 5 +- .../api/annotation/ApplicationAnnotation.java | 8 +-- .../common/security/SecurityContext.java| 6 +-- .../common/util/ScheduledExecutorService.java | 8 +-- .../BlacklistBasedResourceRequestHandler.java | 6 ++- .../java/com/datatorrent/stram/StramClient.java | 2 +- .../stram/StreamingContainerAgent.java | 2 +- .../com/datatorrent/stram/api/StramEvent.java | 2 +- .../api/StramToNodeStartRecordingRequest.java | 2 +- .../stram/client/DTConfiguration.java | 2 +- .../stram/client/FSPartFileAgent.java | 5 +- .../stram/plan/logical/LogicalPlan.java | 4 +- .../stram/util/ObjectMapperFactory.java | 2 +- .../stram/util/SharedPubSubWebSocketClient.java | 2 +- .../datatorrent/stram/util/package-info.java| 4 +- .../stram/webapp/OperatorDiscoverer.java| 2 +- .../com/datatorrent/stram/webapp/TypeGraph.java | 2 +- .../stram/webapp/TypeGraphFactory.java | 4 +- .../stram/webapp/asm/BaseSignatureVisitor.java | 52 +--- .../stram/webapp/asm/CompactClassNode.java | 2 +- .../stram/webapp/asm/CompactMethodNode.java | 4 +- .../stram/webapp/asm/CompactUtil.java | 12 ++--- .../datatorrent/stram/webapp/asm/FieldNode.java | 6 +-- .../stram/webapp/asm/MethodNode.java| 10 ++-- .../webapp/asm/MethodSignatureVisitor.java | 30 +-- .../datatorrent/stram/AffinityRulesTest.java| 2 +- .../com/datatorrent/stram/cli/ApexCliTest.java | 2 +- .../stram/webapp/OperatorDiscoveryTest.java | 5 +- pom.xml | 2 +- 30 files changed, 97 insertions(+), 100 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/3c503a91/api/src/main/java/com/datatorrent/api/AffinityRulesSet.java -- diff --git a/api/src/main/java/com/datatorrent/api/AffinityRulesSet.java b/api/src/main/java/com/datatorrent/api/AffinityRulesSet.java index 31098af..cdc833d 100644 --- a/api/src/main/java/com/datatorrent/api/AffinityRulesSet.java +++ b/api/src/main/java/com/datatorrent/api/AffinityRulesSet.java @@ -29,7 +29,7 @@ import java.util.Collection; public class AffinityRulesSet implements Serializable { private Collection affinityRules; - + private static final long serialVersionUID = -8393974533796177171L; public Collection getAffinityRules() http://git-wip-us.apache.org/repos/asf/apex-core/blob/3c503a91/api/src/main/java/com/datatorrent/api/StorageAgent.java -- diff --git a/api/src/main/java/com/datatorrent/api/StorageAgent.java b/api/src/main/java/com/datatorrent/api/StorageAgent.java index b5dcf39..ed3681f 100644 --- a/api/src/main/java/com/datatorrent/api/StorageAgent.java +++ b/api/src/main/java/com/datatorrent/api/StorageAgent.java @@ -81,15 +81,14 @@ public interface StorageAgent /** * Interface to pass application attributes to storage agent - * * */ public interface ApplicationAwareStorageAgent extends StorageAgent { - + /** * Passes attributes of application to storage agent - * + * * @param map attributes of application */ public void setApplicationAttributes(AttributeMap map); http://git-wip-us.apache.org/repos/asf/apex-core/blob/3c503a91/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java -- diff --git a/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java index f81c15e..7393cd5 100644 --- a/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java +++ b/api/src/main/java/com/datatorrent/api/annotation/ApplicationAnnotation.java @@ -35,15 +35,15 @@ import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) public @interface ApplicationAnnotation { - + /** - * Compile time
[1/2] apex-core git commit: APEXCORE-515 Providing principal for token refresh
Repository: apex-core Updated Branches: refs/heads/master ae0ec2464 -> c13b0dd41 APEXCORE-515 Providing principal for token refresh Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/dd5e95a0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/dd5e95a0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/dd5e95a0 Branch: refs/heads/master Commit: dd5e95a090214bdcff3232cbdcad3dc59c24eff9 Parents: d651edc Author: Pramod ImmaneniAuthored: Wed Aug 24 17:25:25 2016 -0700 Committer: Pramod Immaneni Committed: Thu Sep 1 12:26:43 2016 -0700 -- .../stram/StreamingAppMasterService.java| 3 +- .../stram/client/StramAppLauncher.java | 28 +- .../stram/engine/StreamingContainer.java| 3 +- .../stram/plan/logical/LogicalPlan.java | 1 + .../stram/security/StramUserLogin.java | 20 + .../stram/client/StramAppLauncherTest.java | 30 ++-- 6 files changed, 54 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 43ab743..15b6402 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -676,6 +676,7 @@ public class StreamingAppMasterService extends CompositeService long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME))); long expiryTime = System.currentTimeMillis() + tokenLifeTime; LOG.debug(" expiry token time {}", tokenLifeTime); +String principal = dag.getValue(LogicalPlan.PRINCIPAL); String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE); // Register self with ResourceManager @@ -753,7 +754,7 @@ public class StreamingAppMasterService extends CompositeService if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) { String applicationId = appAttemptID.getApplicationId().toString(); -expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true); +expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true); } if (currentTimeMillis > nodeReportUpdateTime) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 5024c38..619252f 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -560,14 +560,12 @@ public class StramAppLauncher return cl; } - private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException + private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException { -String keytabPath; -if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) { - String keytab; - if ((keytab = StramUserLogin.getKeytab()) == null) { -keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB); - } +String principal = StramUserLogin.getPrincipal(); +String keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE); +if (keytabPath == null) { + String keytab = StramUserLogin.getKeytab(); if (keytab != null) { Path localKeyTabPath = new Path(keytab); try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) { @@ -579,10 +577,11 @@ public class StramAppLauncher } } } -if (keytabPath != null) { +if ((principal != null) && (keytabPath != null)) { + dag.setAttribute(LogicalPlan.PRINCIPAL, principal); dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath); } else { - LOG.warn("No keytab specified for refreshing tokens, application may not be able to run indefinitely"); + LOG.warn("Credentials
[apex-core] Git Push Summary
Repository: apex-core Updated Branches: refs/heads/APEXCORE-515 [deleted] c13b0dd41
[1/2] apex-core git commit: APEXCORE-515 Providing principal for token refresh
Repository: apex-core Updated Branches: refs/heads/APEXCORE-515 [created] c13b0dd41 APEXCORE-515 Providing principal for token refresh Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/dd5e95a0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/dd5e95a0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/dd5e95a0 Branch: refs/heads/APEXCORE-515 Commit: dd5e95a090214bdcff3232cbdcad3dc59c24eff9 Parents: d651edc Author: Pramod ImmaneniAuthored: Wed Aug 24 17:25:25 2016 -0700 Committer: Pramod Immaneni Committed: Thu Sep 1 12:26:43 2016 -0700 -- .../stram/StreamingAppMasterService.java| 3 +- .../stram/client/StramAppLauncher.java | 28 +- .../stram/engine/StreamingContainer.java| 3 +- .../stram/plan/logical/LogicalPlan.java | 1 + .../stram/security/StramUserLogin.java | 20 + .../stram/client/StramAppLauncherTest.java | 30 ++-- 6 files changed, 54 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 43ab743..15b6402 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -676,6 +676,7 @@ public class StreamingAppMasterService extends CompositeService long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME))); long expiryTime = System.currentTimeMillis() + tokenLifeTime; LOG.debug(" expiry token time {}", tokenLifeTime); +String principal = dag.getValue(LogicalPlan.PRINCIPAL); String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE); // Register self with ResourceManager @@ -753,7 +754,7 @@ public class StreamingAppMasterService extends CompositeService if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) { String applicationId = appAttemptID.getApplicationId().toString(); -expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true); +expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true); } if (currentTimeMillis > nodeReportUpdateTime) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/dd5e95a0/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java -- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 5024c38..619252f 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -560,14 +560,12 @@ public class StramAppLauncher return cl; } - private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException + private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException { -String keytabPath; -if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) { - String keytab; - if ((keytab = StramUserLogin.getKeytab()) == null) { -keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB); - } +String principal = StramUserLogin.getPrincipal(); +String keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE); +if (keytabPath == null) { + String keytab = StramUserLogin.getKeytab(); if (keytab != null) { Path localKeyTabPath = new Path(keytab); try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) { @@ -579,10 +577,11 @@ public class StramAppLauncher } } } -if (keytabPath != null) { +if ((principal != null) && (keytabPath != null)) { + dag.setAttribute(LogicalPlan.PRINCIPAL, principal); dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath); } else { - LOG.warn("No keytab specified for refreshing tokens, application may not be able to run indefinitely"); +
[2/2] apex-core git commit: Merge branch 'APEXCORE-515' of https://github.com/PramodSSImmaneni/apex-core into APEXCORE-515
Merge branch 'APEXCORE-515' of https://github.com/PramodSSImmaneni/apex-core into APEXCORE-515 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/c13b0dd4 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/c13b0dd4 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/c13b0dd4 Branch: refs/heads/APEXCORE-515 Commit: c13b0dd417e805601169f090fdeebf5b42c78651 Parents: ae0ec24 dd5e95a Author: Vlad RozovAuthored: Thu Sep 1 12:54:37 2016 -0700 Committer: Vlad Rozov Committed: Thu Sep 1 12:54:37 2016 -0700 -- .../stram/StreamingAppMasterService.java| 3 +- .../stram/client/StramAppLauncher.java | 28 +- .../stram/engine/StreamingContainer.java| 3 +- .../stram/plan/logical/LogicalPlan.java | 1 + .../stram/security/StramUserLogin.java | 20 + .../stram/client/StramAppLauncherTest.java | 30 ++-- 6 files changed, 54 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/c13b0dd4/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java --
apex-core git commit: Remove obsolete japicmp exclude, use latest version.
Repository: apex-core Updated Branches: refs/heads/master 9c48c41e9 -> a53a5839c Remove obsolete japicmp exclude, use latest version. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a53a5839 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a53a5839 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a53a5839 Branch: refs/heads/master Commit: a53a5839cce5b40945bfe8e21062655c78c1fb05 Parents: 9c48c41 Author: Thomas WeiseAuthored: Mon Aug 29 19:07:38 2016 -0700 Committer: Thomas Weise Committed: Mon Aug 29 19:07:38 2016 -0700 -- pom.xml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/a53a5839/pom.xml -- diff --git a/pom.xml b/pom.xml index dbcebad..ba64906 100644 --- a/pom.xml +++ b/pom.xml @@ -367,7 +367,7 @@ com.github.siom79.japicmp japicmp-maven-plugin - 0.7.0 + 0.9.0 @@ -391,8 +391,6 @@ @org.apache.hadoop.classification.InterfaceStability$Evolving @org.apache.hadoop.classification.InterfaceStability$Unstable - - com.datatorrent.common.util.PubSubWebSocketClient
apex-core git commit: APEXCORE-222 purging of the buffer server is done from the streaming container, instead of StreamingContainerManager
Repository: apex-core Updated Branches: refs/heads/master 1b1813f96 -> d32ea3c04 APEXCORE-222 purging of the buffer server is done from the streaming container, instead of StreamingContainerManager Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d32ea3c0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d32ea3c0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d32ea3c0 Branch: refs/heads/master Commit: d32ea3c04498ed473364140ce444b6fe6e732149 Parents: 1b1813f Author: sandeshhAuthored: Mon Jun 27 20:58:07 2016 -0700 Committer: sandeshh Committed: Fri Jul 15 10:26:35 2016 -0700 -- .../bufferserver/internal/DataList.java | 11 - .../datatorrent/bufferserver/server/Server.java | 12 ++--- .../stram/StreamingContainerManager.java| 26 .../stram/engine/StreamingContainer.java| 8 ++ 4 files changed, 22 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java -- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 2a01102..3f596d9 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -176,21 +176,20 @@ public class DataList numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1); } - public void purge(final int baseSeconds, final int windowId) + public void purge(final long windowId) { -final long longWindowId = (long)baseSeconds << 32 | windowId; logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), -Codec.getStringWindowId(longWindowId)); +Codec.getStringWindowId(windowId)); int numberOfInMemBlockPurged = 0; synchronized (this) { - for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; + for (Block prev = null, temp = first; temp != null && temp.starting_window <= windowId; prev = temp, temp = temp.next) { -if (temp.ending_window > longWindowId || temp == last) { +if (temp.ending_window > windowId || temp == last) { if (prev != null) { first = temp; } - first.purge(longWindowId); + first.purge(windowId); break; } temp.discard(false); http://git-wip-us.apache.org/repos/asf/apex-core/blob/d32ea3c0/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java -- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 83b50d2..12eed5f 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -26,7 +26,6 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; @@ -169,7 +168,7 @@ public class Server implements ServerListener return identity; } - private final HashMap publisherBuffers = new HashMap (); + private final ConcurrentHashMap publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); private final ConcurrentHashMap subscriberGroups = new ConcurrentHashMap (); private final ConcurrentHashMap publisherChannels = new ConcurrentHashMap<>(); private final ConcurrentHashMap subscriberChannels = new ConcurrentHashMap<>(); @@ -185,7 +184,7 @@ public class Server implements ServerListener if (dl == null) { message = ("Invalid identifier '" + request.getIdentifier() + "'").getBytes(); } else { - dl.purge(request.getBaseSeconds(), request.getWindowId()); + dl.purge((long)request.getBaseSeconds() << 32 | request.getWindowId()); message = ("Request sent for processing: " + request).getBytes(); } @@ -199,6 +198,13 @@ public class Server implements ServerListener } } + public void