[apex-core] branch master updated: APEXCORE-754 Add plugin dependency jar-files to application package
This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git The following commit(s) were added to refs/heads/master by this push: new 5994a0b APEXCORE-754 Add plugin dependency jar-files to application package 5994a0b is described below commit 5994a0b0fa9d2b60ca7595e7c6a1ebaab168060c Author: Sergey GolovkoAuthorDate: Thu Jul 6 11:25:08 2017 -0700 APEXCORE-754 Add plugin dependency jar-files to application package Included plugin jar-files into the application package and added names of the plugin jar-files to the application classpath. --- .../org/apache/apex/common/util/JarHelper.java | 76 +- .../java/com/datatorrent/stram/StramClient.java| 22 +-- .../stram/StreamingAppMasterService.java | 2 + 3 files changed, 92 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/apex/common/util/JarHelper.java b/common/src/main/java/org/apache/apex/common/util/JarHelper.java index d40cec8..bd75b44 100644 --- a/common/src/main/java/org/apache/apex/common/util/JarHelper.java +++ b/common/src/main/java/org/apache/apex/common/util/JarHelper.java @@ -27,11 +27,14 @@ import java.net.URL; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.security.CodeSource; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; @@ -41,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -52,6 +56,7 @@ import org.apache.hadoop.classification.InterfaceStability; public class JarHelper { private static final Logger logger = LoggerFactory.getLogger(JarHelper.class); + private static final String APEX_DEPENDENCIES = "apex-dependencies"; private final Map sourceToJar = new HashMap<>(); @@ -68,7 +73,7 @@ public class JarHelper return temp.getAbsolutePath(); } - public String getJar(Class jarClass) + public String getJar(Class jarClass, boolean makeJarFromFolder) { String jar = null; final CodeSource codeSource = jarClass.getProtectionDomain().getCodeSource(); @@ -88,6 +93,9 @@ public class JarHelper jar = location.getFile(); final File dir = new File(jar); if (dir.isDirectory()) { +if (!makeJarFromFolder) { + throw new AssertionError("Cannot resolve jar file for " + jarClass + ". URL " + location); +} try { jar = createJar("apex-", dir, false); } catch (IOException e) { @@ -107,6 +115,72 @@ public class JarHelper return jar; } + public String getJar(Class jarClass) + { +return getJar(jarClass, true); + } + + /** + * Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files + * that are defined in the property "apex-dependencies" of the manifest of the root jar-file. + * If the class is an independent file the method makes jar file from the folder that contains the class + * @param jarClass Class + * @param makeJarFromFolder True if the method should make jar from folder that contains the independent class + * @param addJarDependencies True if the method should include dependent jar files + * @return Set of names of the jar-files + */ + public Set getJars(Class jarClass, boolean makeJarFromFolder, boolean addJarDependencies) + { +String jar = getJar(jarClass, makeJarFromFolder); +Set set = new HashSet<>(); +if (jar != null) { + set.add(jar); + if (addJarDependencies) { +try { + getDependentJarsFromManifest(new JarFile(jar), set); +} catch (IOException ex) { + logger.warn("Cannot open Jar-file {}", jar); +} + } +} +return set; + } + + /** + * Returns a full path to the jar-file that contains the given class and all full paths to dependent jar-files + * that are defined in the property "apex-dependencies" of the manifest of the root jar-file. + * If the class is an independent file the method makes jar file from the folder that contains the class + * @param jarClass Class + * @return Set of names of the jar-files + */ + public Set getJars(Class jarClass) + { +return getJars(jarClass, true, true); + } + + /** + * Adds dependent jar-files from manifest to the target list of jar-files + * @param jarFile Jar file + *
[apex-malhar] branch master updated: APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare <sanjaypuj...@users.noreply.github.com> is a contributing author. This c
This is an automated email from the ASF dual-hosted git repository. pramod pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git The following commit(s) were added to refs/heads/master by this push: new afb4252 APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujareis a contributing author. This closes #612. afb4252 is described below commit afb4252435745669bc999c8a6b4e7fa2b58cf096 Author: oliverwnk AuthorDate: Wed Jul 12 23:17:37 2017 -0700 APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare is a contributing author. This closes #612. --- .../lib/io/jms/JMSTransactionableStore.java| 75 +++ .../io/jms/JMSTransactionableStoreTestBase.java| 137 +++-- 2 files changed, 147 insertions(+), 65 deletions(-) diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java index 11b8447..996bfd4 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java @@ -31,9 +31,14 @@ import javax.jms.QueueBrowser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.annotation.Stateless; + /** * This transactionable store commits the messages sent within a window along with the windowId of the completed window - * to JMS. This store ensures that the JMS output operator is capable of outputting data to JMS exactly once. + * to JMS. WindowIds will be sent to a subject specific meta-data queue with the name of the form '{subject}.metadata'. + * It is the responsibility of the user to create the meta-data queue in the JMS provider. + * A MessageSelector and an unique 'appOperatorId' message property ensure each operator receives its own windowId. + * This store ensures that the JMS output operator is capable of outputting data to JMS exactly once. * * @since 2.0.0 */ @@ -44,6 +49,8 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore private transient MessageProducer producer; private transient MessageConsumer consumer; + private String metaQueueName; + private static final String APP_OPERATOR_ID = "appOperatorId"; /** * Indicates whether the store is connected or not. @@ -58,6 +65,26 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore { } + /** + * Get the meta queue name for this store + * + * @return the metaQueueName + */ + public String getMetaQueueName() + { +return metaQueueName; + } + + /** + * Set the meta queue name for this store + * + * @param metaQueueName the metaQueueName to set + */ + public void setMetaQueueName(String metaQueueName) + { +this.metaQueueName = metaQueueName; + } + @Override @SuppressWarnings("rawtypes") public long getCommittedWindowId(String appId, int operatorId) @@ -65,17 +92,15 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore logger.debug("Getting committed windowId appId {} operatorId {}", appId, operatorId); try { - beginTransaction(); BytesMessage message = (BytesMessage)consumer.receive(); - logger.debug("Retrieved committed window message id {}", message.getJMSMessageID()); + logger.debug("Retrieved committed window messageId: {}, messageAppOperatorIdProp: {}", message.getJMSMessageID(), + message.getStringProperty(APP_OPERATOR_ID)); long windowId = message.readLong(); - message = getBase().getSession().createBytesMessage(); - message.writeLong(windowId); - producer.send(message); + writeWindowId(appId, operatorId, windowId); commitTransaction(); - + logger.debug("metaQueueName: " + metaQueueName); logger.debug("Retrieved windowId {}", windowId); return windowId; } catch (JMSException ex) { @@ -94,10 +119,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore appId, operatorId, windowId); try { removeCommittedWindowId(appId, operatorId); - BytesMessage bytesMessage = this.getBase().getSession().createBytesMessage(); - bytesMessage.writeLong(windowId); - producer.send(bytesMessage); - logger.debug("Retrieved committed window message id {}", bytesMessage.getJMSMessageID()); + writeWindowId(appId, operatorId, windowId); } catch (JMSException ex) { throw new RuntimeException(ex); } @@ -113,6 +135,15 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore } } + private void writeWindowId(String appId, int operatorId, long windowId) throws JMSException + { +BytesMessage