[apex-core] branch master updated: APEXCORE-754 Add plugin dependency jar-files to application package

2017-07-13 Thread vrozov
This is an automated email from the ASF dual-hosted git repository.

vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
 new 5994a0b  APEXCORE-754 Add plugin dependency jar-files to application 
package
5994a0b is described below

commit 5994a0b0fa9d2b60ca7595e7c6a1ebaab168060c
Author: Sergey Golovko 
AuthorDate: Thu Jul 6 11:25:08 2017 -0700

APEXCORE-754 Add plugin dependency jar-files to application package

Included plugin jar-files into the application package and added names of 
the plugin jar-files to the application classpath.
---
 .../org/apache/apex/common/util/JarHelper.java | 76 +-
 .../java/com/datatorrent/stram/StramClient.java| 22 +--
 .../stram/StreamingAppMasterService.java   |  2 +
 3 files changed, 92 insertions(+), 8 deletions(-)

diff --git a/common/src/main/java/org/apache/apex/common/util/JarHelper.java 
b/common/src/main/java/org/apache/apex/common/util/JarHelper.java
index d40cec8..bd75b44 100644
--- a/common/src/main/java/org/apache/apex/common/util/JarHelper.java
+++ b/common/src/main/java/org/apache/apex/common/util/JarHelper.java
@@ -27,11 +27,14 @@ import java.net.URL;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.security.CodeSource;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.jar.JarOutputStream;
@@ -41,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -52,6 +56,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 public class JarHelper
 {
   private static final Logger logger = 
LoggerFactory.getLogger(JarHelper.class);
+  private static final String APEX_DEPENDENCIES = "apex-dependencies";
 
   private final Map sourceToJar = new HashMap<>();
 
@@ -68,7 +73,7 @@ public class JarHelper
 return temp.getAbsolutePath();
   }
 
-  public String getJar(Class jarClass)
+  public String getJar(Class jarClass, boolean makeJarFromFolder)
   {
 String jar = null;
 final CodeSource codeSource = 
jarClass.getProtectionDomain().getCodeSource();
@@ -88,6 +93,9 @@ public class JarHelper
   jar = location.getFile();
   final File dir = new File(jar);
   if (dir.isDirectory()) {
+if (!makeJarFromFolder) {
+  throw new AssertionError("Cannot resolve jar file for " + 
jarClass + ". URL " + location);
+}
 try {
   jar = createJar("apex-", dir, false);
 } catch (IOException e) {
@@ -107,6 +115,72 @@ public class JarHelper
 return jar;
   }
 
+  public String getJar(Class jarClass)
+  {
+return getJar(jarClass, true);
+  }
+
+  /**
+   * Returns a full path to the jar-file that contains the given class and all 
full paths to dependent jar-files
+   * that are defined in the property "apex-dependencies" of the manifest of 
the root jar-file.
+   * If the class is an independent file the method makes jar file from the 
folder that contains the class
+   * @param jarClass Class
+   * @param makeJarFromFolder True if the method should make jar from folder 
that contains the independent class
+   * @param addJarDependencies True if the method should include dependent jar 
files
+   * @return Set of names of the jar-files
+   */
+  public Set getJars(Class jarClass, boolean makeJarFromFolder, 
boolean addJarDependencies)
+  {
+String jar = getJar(jarClass, makeJarFromFolder);
+Set set = new HashSet<>();
+if (jar != null) {
+  set.add(jar);
+  if (addJarDependencies) {
+try {
+  getDependentJarsFromManifest(new JarFile(jar), set);
+} catch (IOException ex) {
+  logger.warn("Cannot open Jar-file {}", jar);
+}
+  }
+}
+return set;
+  }
+
+  /**
+   * Returns a full path to the jar-file that contains the given class and all 
full paths to dependent jar-files
+   * that are defined in the property "apex-dependencies" of the manifest of 
the root jar-file.
+   * If the class is an independent file the method makes jar file from the 
folder that contains the class
+   * @param jarClass Class
+   * @return Set of names of the jar-files
+   */
+  public Set getJars(Class jarClass)
+  {
+return getJars(jarClass, true, true);
+  }
+
+  /**
+   * Adds dependent jar-files from manifest to the target list of jar-files
+   * @param jarFile Jar file
+   * 

[apex-malhar] branch master updated: APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare <sanjaypuj...@users.noreply.github.com> is a contributing author. This c

2017-07-13 Thread pramod
This is an automated email from the ASF dual-hosted git repository.

pramod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
 new afb4252  APEXMALHAR-2434 Use fixed/settable metaQueueName in 
JMSTransactionableStore. Sanjay Pujare  
is a contributing author. This closes #612.
afb4252 is described below

commit afb4252435745669bc999c8a6b4e7fa2b58cf096
Author: oliverwnk 
AuthorDate: Wed Jul 12 23:17:37 2017 -0700

APEXMALHAR-2434 Use fixed/settable metaQueueName in 
JMSTransactionableStore. Sanjay Pujare  
is a contributing author. This closes #612.
---
 .../lib/io/jms/JMSTransactionableStore.java|  75 +++
 .../io/jms/JMSTransactionableStoreTestBase.java| 137 +++--
 2 files changed, 147 insertions(+), 65 deletions(-)

diff --git 
a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java 
b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
index 11b8447..996bfd4 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
@@ -31,9 +31,14 @@ import javax.jms.QueueBrowser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.annotation.Stateless;
+
 /**
  * This transactionable store commits the messages sent within a window along 
with the windowId of the completed window
- * to JMS. This store ensures that the JMS output operator is capable of 
outputting data to JMS exactly once.
+ * to JMS. WindowIds will be sent to a subject specific meta-data queue with 
the name of the form '{subject}.metadata'.
+ * It is the responsibility of the user to create the meta-data queue in the 
JMS provider.
+ * A MessageSelector and an unique 'appOperatorId' message property ensure 
each operator receives its own windowId.
+ * This store ensures that the JMS output operator is capable of outputting 
data to JMS exactly once.
  *
  * @since 2.0.0
  */
@@ -44,6 +49,8 @@ public class JMSTransactionableStore extends 
JMSBaseTransactionableStore
 
   private transient MessageProducer producer;
   private transient MessageConsumer consumer;
+  private String metaQueueName;
+  private static final String APP_OPERATOR_ID = "appOperatorId";
 
   /**
* Indicates whether the store is connected or not.
@@ -58,6 +65,26 @@ public class JMSTransactionableStore extends 
JMSBaseTransactionableStore
   {
   }
 
+  /**
+   * Get the meta queue name for this store
+   *
+   * @return the metaQueueName
+   */
+  public String getMetaQueueName()
+  {
+return metaQueueName;
+  }
+
+  /**
+   * Set the meta queue name for this store
+   *
+   * @param metaQueueName the metaQueueName to set
+   */
+  public void setMetaQueueName(String metaQueueName)
+  {
+this.metaQueueName = metaQueueName;
+  }
+
   @Override
   @SuppressWarnings("rawtypes")
   public long getCommittedWindowId(String appId, int operatorId)
@@ -65,17 +92,15 @@ public class JMSTransactionableStore extends 
JMSBaseTransactionableStore
 logger.debug("Getting committed windowId appId {} operatorId {}", appId, 
operatorId);
 
 try {
-
   beginTransaction();
   BytesMessage message = (BytesMessage)consumer.receive();
-  logger.debug("Retrieved committed window message id {}", 
message.getJMSMessageID());
+  logger.debug("Retrieved committed window messageId: {}, 
messageAppOperatorIdProp: {}", message.getJMSMessageID(),
+  message.getStringProperty(APP_OPERATOR_ID));
   long windowId = message.readLong();
 
-  message = getBase().getSession().createBytesMessage();
-  message.writeLong(windowId);
-  producer.send(message);
+  writeWindowId(appId, operatorId, windowId);
   commitTransaction();
-
+  logger.debug("metaQueueName: " + metaQueueName);
   logger.debug("Retrieved windowId {}", windowId);
   return windowId;
 } catch (JMSException ex) {
@@ -94,10 +119,7 @@ public class JMSTransactionableStore extends 
JMSBaseTransactionableStore
 appId, operatorId, windowId);
 try {
   removeCommittedWindowId(appId, operatorId);
-  BytesMessage bytesMessage = 
this.getBase().getSession().createBytesMessage();
-  bytesMessage.writeLong(windowId);
-  producer.send(bytesMessage);
-  logger.debug("Retrieved committed window message id {}", 
bytesMessage.getJMSMessageID());
+  writeWindowId(appId, operatorId, windowId);
 } catch (JMSException ex) {
   throw new RuntimeException(ex);
 }
@@ -113,6 +135,15 @@ public class JMSTransactionableStore extends 
JMSBaseTransactionableStore
 }
   }
 
+  private void writeWindowId(String appId, int operatorId, long windowId) 
throws JMSException
+  {
+BytesMessage