[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283057#comment-16283057 ] ASF GitHub Bot commented on APEXMALHAR-2548: tweise closed pull request #682: APEXMALHAR-2548 Fixed XmlParserApplicationTest test failure URL: https://github.com/apache/apex-malhar/pull/682 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java index 51eaeee7ea..439a53b57e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/PubSubHelper.java @@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceStability; +import com.google.common.base.Preconditions; + import com.datatorrent.api.Context; @InterfaceStability.Evolving @@ -57,9 +59,7 @@ public static URI getURI(String address) public static URI getURI(String address, boolean useSSL) { -if (address == null) { - throw new NullPointerException("No address specified"); -} +Preconditions.checkNotNull(address,"No address specified"); String uri = (useSSL ? "wss://" : "ws://") + address + "/pubsub"; logger.debug("PubSub uri {}", uri); return URI.create(uri); diff --git a/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java b/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java index bd8b4a7b22..e35d6b0d1d 100644 --- a/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java +++ b/library/src/test/java/com/datatorrent/lib/parser/XmlParserApplicationTest.java @@ -41,7 +41,7 @@ */ public class XmlParserApplicationTest { - public static int TupleCount; + public static volatile int TupleCount; public static com.datatorrent.lib.parser.XmlParserTest.EmployeeBean obj; @Test public void testApplication() This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pubsub operators do not use the correct URL to connect to web socket server > in SSL mode > --- > > Key: APEXMALHAR-2548 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2548 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > Fix For: 4.0.0, 3.9.0 > > > In SSL mode, pub sub operators need to use wss protocol instead of ws. Today > they use ws. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282188#comment-16282188 ] ASF GitHub Bot commented on APEXMALHAR-2548: PramodSSImmaneni opened a new pull request #682: APEXMALHAR-2548 Fixed XmlParserApplicationTest test failure URL: https://github.com/apache/apex-malhar/pull/682 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pubsub operators do not use the correct URL to connect to web socket server > in SSL mode > --- > > Key: APEXMALHAR-2548 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2548 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > Fix For: 4.0.0, 3.9.0 > > > In SSL mode, pub sub operators need to use wss protocol instead of ws. Today > they use ws. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281850#comment-16281850 ] ASF GitHub Bot commented on APEXMALHAR-2548: tweise closed pull request #676: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster URL: https://github.com/apache/apex-malhar/pull/676 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java index 98dfebd7b8..82c9214fa9 100644 --- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java +++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java @@ -18,19 +18,24 @@ */ package com.datatorrent.apps.logstream; -import java.net.URI; -import java.util.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; - -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; +import com.datatorrent.contrib.redis.RedisMapOutputOperator; +import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; import com.datatorrent.lib.algo.TopN; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; @@ -43,13 +48,6 @@ import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator; import com.datatorrent.lib.util.DimensionTimeBucketSumOperator; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; -import com.datatorrent.contrib.redis.RedisMapOutputOperator; -import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; - /** * Log stream processing application based on Apex platform. * This application consumes log data generated by running systems and services @@ -156,14 +154,12 @@ private SYSTEM_KEYS(String value) private InputPort wsOutput(DAG dag, String operatorName) { -String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); -if (!StringUtils.isEmpty(daemonAddress)) { - URI uri = URI.create("ws://" + daemonAddress + "/pubsub"); +if (PubSubHelper.isGatewayConfigured(dag)) { String appId = "appid"; //appId = dag.attrValue(DAG.APPLICATION_ID, null); // will be used once UI is able to pick applications from list and listen to corresponding application String topic = "apps.logstream." + appId + "." + operatorName; PubSubWebSocketOutputOperator wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator()); - wsOut.setUri(uri); + wsOut.setUri(PubSubHelper.getURI(dag)); wsOut.setTopic(topic); return wsOut.input; } diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java index 73c38ef6cb..eed9a683e7 100644 --- a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java @@ -20,11 +20,13 @@ import java.io.Serializable; import java.net.URI; + import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator; import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context; -import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; @@ -88,11 +90,7 @@ public void populateDAG(DAG dag, Configuration conf) { try { - String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS); - if (gatewayAddress == null) { -gatewayAddress = "localhost:9090"; - } - URI duri = URI.create("ws://" + gatewayAddress + "/pubsub"); + URI duri
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238807#comment-16238807 ] ASF GitHub Bot commented on APEXMALHAR-2548: tweise closed pull request #676: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster URL: https://github.com/apache/apex-malhar/pull/676 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java index 98dfebd7b8..82c9214fa9 100644 --- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java +++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java @@ -18,19 +18,24 @@ */ package com.datatorrent.apps.logstream; -import java.net.URI; -import java.util.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; - -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; +import com.datatorrent.contrib.redis.RedisMapOutputOperator; +import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; import com.datatorrent.lib.algo.TopN; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; @@ -43,13 +48,6 @@ import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator; import com.datatorrent.lib.util.DimensionTimeBucketSumOperator; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; -import com.datatorrent.contrib.redis.RedisMapOutputOperator; -import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; - /** * Log stream processing application based on Apex platform. * This application consumes log data generated by running systems and services @@ -156,14 +154,12 @@ private SYSTEM_KEYS(String value) private InputPort wsOutput(DAG dag, String operatorName) { -String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); -if (!StringUtils.isEmpty(daemonAddress)) { - URI uri = URI.create("ws://" + daemonAddress + "/pubsub"); +if (PubSubHelper.isGatewayConfigured(dag)) { String appId = "appid"; //appId = dag.attrValue(DAG.APPLICATION_ID, null); // will be used once UI is able to pick applications from list and listen to corresponding application String topic = "apps.logstream." + appId + "." + operatorName; PubSubWebSocketOutputOperator wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator()); - wsOut.setUri(uri); + wsOut.setUri(PubSubHelper.getURI(dag)); wsOut.setTopic(topic); return wsOut.input; } diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java index 73c38ef6cb..eed9a683e7 100644 --- a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java @@ -20,11 +20,13 @@ import java.io.Serializable; import java.net.URI; + import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator; import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context; -import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; @@ -88,11 +90,7 @@ public void populateDAG(DAG dag, Configuration conf) { try { - String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS); - if (gatewayAddress == null) { -gatewayAddress = "localhost:9090"; - } - URI duri = URI.create("ws://" + gatewayAddress + "/pubsub"); + URI duri
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238808#comment-16238808 ] ASF GitHub Bot commented on APEXMALHAR-2548: PramodSSImmaneni opened a new pull request #676: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster URL: https://github.com/apache/apex-malhar/pull/676 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pubsub operators do not use the correct URL to connect to web socket server > in SSL mode > --- > > Key: APEXMALHAR-2548 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2548 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni >Priority: Major > > In SSL mode, pub sub operators need to use wss protocol instead of ws. Today > they use ws. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238640#comment-16238640 ] ASF GitHub Bot commented on APEXMALHAR-2548: PramodSSImmaneni closed pull request #676: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster URL: https://github.com/apache/apex-malhar/pull/676 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java index 98dfebd7b8..82c9214fa9 100644 --- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java +++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java @@ -18,19 +18,24 @@ */ package com.datatorrent.apps.logstream; -import java.net.URI; -import java.util.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; - -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; +import com.datatorrent.contrib.redis.RedisMapOutputOperator; +import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; import com.datatorrent.lib.algo.TopN; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; @@ -43,13 +48,6 @@ import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator; import com.datatorrent.lib.util.DimensionTimeBucketSumOperator; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; -import com.datatorrent.contrib.redis.RedisMapOutputOperator; -import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; - /** * Log stream processing application based on Apex platform. * This application consumes log data generated by running systems and services @@ -156,14 +154,12 @@ private SYSTEM_KEYS(String value) private InputPort wsOutput(DAG dag, String operatorName) { -String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); -if (!StringUtils.isEmpty(daemonAddress)) { - URI uri = URI.create("ws://" + daemonAddress + "/pubsub"); +if (PubSubHelper.isGatewayConfigured(dag)) { String appId = "appid"; //appId = dag.attrValue(DAG.APPLICATION_ID, null); // will be used once UI is able to pick applications from list and listen to corresponding application String topic = "apps.logstream." + appId + "." + operatorName; PubSubWebSocketOutputOperator wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator()); - wsOut.setUri(uri); + wsOut.setUri(PubSubHelper.getURI(dag)); wsOut.setTopic(topic); return wsOut.input; } diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java index 73c38ef6cb..eed9a683e7 100644 --- a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java @@ -20,11 +20,13 @@ import java.io.Serializable; import java.net.URI; + import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator; import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context; -import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; @@ -88,11 +90,7 @@ public void populateDAG(DAG dag, Configuration conf) { try { - String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS); - if (gatewayAddress == null) { -gatewayAddress = "localhost:9090"; - } - URI duri = URI.create("ws://" + gatewayAddress + "/pubsub"); +
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16238641#comment-16238641 ] ASF GitHub Bot commented on APEXMALHAR-2548: PramodSSImmaneni opened a new pull request #676: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster URL: https://github.com/apache/apex-malhar/pull/676 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pubsub operators do not use the correct URL to connect to web socket server > in SSL mode > --- > > Key: APEXMALHAR-2548 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2548 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni >Priority: Major > > In SSL mode, pub sub operators need to use wss protocol instead of ws. Today > they use ws. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237171#comment-16237171 ] ASF GitHub Bot commented on APEXMALHAR-2548: tweise closed pull request #676: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster URL: https://github.com/apache/apex-malhar/pull/676 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java index 98dfebd7b8..82c9214fa9 100644 --- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java +++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java @@ -18,19 +18,24 @@ */ package com.datatorrent.apps.logstream; -import java.net.URI; -import java.util.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; - -import org.apache.commons.lang.StringUtils; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; +import com.datatorrent.contrib.redis.RedisMapOutputOperator; +import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; import com.datatorrent.lib.algo.TopN; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; @@ -43,13 +48,6 @@ import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator; import com.datatorrent.lib.util.DimensionTimeBucketSumOperator; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.contrib.redis.RedisKeyValPairOutputOperator; -import com.datatorrent.contrib.redis.RedisMapOutputOperator; -import com.datatorrent.contrib.redis.RedisNumberSummationMapOutputOperator; - /** * Log stream processing application based on Apex platform. * This application consumes log data generated by running systems and services @@ -156,14 +154,12 @@ private SYSTEM_KEYS(String value) private InputPort wsOutput(DAG dag, String operatorName) { -String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); -if (!StringUtils.isEmpty(daemonAddress)) { - URI uri = URI.create("ws://" + daemonAddress + "/pubsub"); +if (PubSubHelper.isGatewayConfigured(dag)) { String appId = "appid"; //appId = dag.attrValue(DAG.APPLICATION_ID, null); // will be used once UI is able to pick applications from list and listen to corresponding application String topic = "apps.logstream." + appId + "." + operatorName; PubSubWebSocketOutputOperator wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator()); - wsOut.setUri(uri); + wsOut.setUri(PubSubHelper.getURI(dag)); wsOut.setTopic(topic); return wsOut.input; } diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java index 73c38ef6cb..eed9a683e7 100644 --- a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java @@ -20,11 +20,13 @@ import java.io.Serializable; import java.net.URI; + import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator; import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator; +import org.apache.apex.malhar.lib.utils.PubSubHelper; import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context; -import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; @@ -88,11 +90,7 @@ public void populateDAG(DAG dag, Configuration conf) { try { - String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS); - if (gatewayAddress == null) { -gatewayAddress = "localhost:9090"; - } - URI duri = URI.create("ws://" + gatewayAddress + "/pubsub"); + URI duri
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237172#comment-16237172 ] ASF GitHub Bot commented on APEXMALHAR-2548: PramodSSImmaneni opened a new pull request #676: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster URL: https://github.com/apache/apex-malhar/pull/676 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pubsub operators do not use the correct URL to connect to web socket server > in SSL mode > --- > > Key: APEXMALHAR-2548 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2548 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni >Priority: Major > > In SSL mode, pub sub operators need to use wss protocol instead of ws. Today > they use ws. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (APEXMALHAR-2548) Pubsub operators do not use the correct URL to connect to web socket server in SSL mode
[ https://issues.apache.org/jira/browse/APEXMALHAR-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16217911#comment-16217911 ] ASF GitHub Bot commented on APEXMALHAR-2548: PramodSSImmaneni opened a new pull request #676: APEXMALHAR-2548 Using the correct websocket scheme when connecting to a SSL cluster URL: https://github.com/apache/apex-malhar/pull/676 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pubsub operators do not use the correct URL to connect to web socket server > in SSL mode > --- > > Key: APEXMALHAR-2548 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2548 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > In SSL mode, pub sub operators need to use wss protocol instead of ws. Today > they use ws. -- This message was sent by Atlassian JIRA (v6.4.14#64029)