[GitHub] storm pull request #2906: STORM-3123 - add support for Kafka security config...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2906#discussion_r233734291 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java --- @@ -27,12 +27,15 @@ private final String consumerGroupId; // consumer group id for which the offset needs to be calculated private final String bootStrapBrokers; // bootstrap brokers private final String securityProtocol; // security protocol to connect to kafka +private final String consumerConfig; // security configuration file to connect to secure kafka --- End diff -- nit: IMHO we may be able to have better name (like securityConfFilePath?) to represent what comment says. Other parameters look like self-described but I couldn't imagine new parameter points to the file by its name. ---
[GitHub] storm pull request #2906: STORM-3123 - add support for Kafka security config...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2906#discussion_r233734900 --- Diff: storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java --- @@ -68,14 +77,38 @@ commands.add((String) jsonConf.get(GROUPID_CONFIG)); commands.add("-b"); commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG)); -String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol"); +String securityProtocol = (String) jsonConf.get(SECURITY_PROTOCOL_CONFIG); if (securityProtocol != null && !securityProtocol.isEmpty()) { commands.add("-s"); commands.add(securityProtocol); } return commands; } +private static File getExtraPropertiesFile(Map jsonConf) { --- End diff -- nit: maybe using `create` or `build` or so instead of `get` would be clear to represent that new (temporary) file is generated. ---
[GitHub] storm pull request #2906: STORM-3123 - add support for Kafka security config...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2906#discussion_r233733766 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -692,11 +692,28 @@ public String toString() { configuration.put(configKeyPrefix + "topics", getTopicsString()); configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId()); -configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers")); -configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol")); +for (Entry conf: kafkaSpoutConfig.getKafkaProps().entrySet()) { +if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) { --- End diff -- nit: Might be better to leave a log message for dropped configuration keys from here. Maybe DEBUG is fine since I guess they're only used for storm-kafka-monitor. ---
[GitHub] storm issue #2853: STORM-2963 Adding notes on GC to Performance.md
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2853 @roshannaik No worries. Please comment when you're done. Adding `WIP` would be also good to determine in this case. ---
[GitHub] storm issue #2792: Add the getName() method in order to obtain the applied l...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2792 @alex-87 Sorry I'm late on party. Could you follow up @srdo guide? ---
[GitHub] storm issue #2773: Blobstore sync bug fix
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2773 @arunmahadevan I'd be really happy if you can take forward with this. Looks like we lost @jiangzhileaf. ---
[GitHub] storm issue #2829: STORM-3222: Fix KafkaSpout internals to use LinkedList in...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2829 If my understanding is right, there's pendingEmits (unbounded) which comes into play when it can't push tuple immediately, so emit should not block, nextTuple should not block as well. If emit can block in spout it could be pretty much a big risk (ack/fail is not handled as well and tuples will fail into timeout, and it may raise backpressure again). @roshannaik Could you confirm? ---
[GitHub] storm issue #2829: STORM-3222: Fix KafkaSpout internals to use LinkedList in...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2829 @srdo is right. The reason is to enforce "max.spout.pending.limit", but the concept is brought when backpressure is not in place. IMHO, the concept is the thing we should drop whenever backpressure is stabilized. While I'm wondering backpressure would work correctly without any critical performance hit on 1.x (we disabled by default AFAIK), STORM-2306 renewed the mechanism of backpressure which we may be OK to rely on backpressure. If my memory is right, providing "max.spout.pending" was still the way to optimize even with STORM-2306, but even without this it worked correctly with sane performance. At least for Storm 2.0 we could try out relying only backpressure and drop the concept if we are happy with the result. ---
[GitHub] storm pull request #2812: STORM-3203: add back in the permission updates
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2812#discussion_r213125537 --- Diff: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java --- @@ -53,21 +54,24 @@ private final boolean isLocalMode; private final Path topologyBasicBlobsRootDir; private final AdvancedFSOps fsOps; +private final String owner; private volatile long version = NOT_DOWNLOADED_VERSION; private volatile long size = 0; + /** * Create a new LocallyCachedBlob. - * - * @param topologyId the ID of the topology. + * @param topologyId the ID of the topology. * @param type the type of the blob. + * @param assignment the assignment, mostly to know who the owner is. */ protected LocallyCachedTopologyBlob(final String topologyId, final boolean isLocalMode, final Map conf, -final AdvancedFSOps fsOps, final TopologyBlobType type) throws IOException { +final AdvancedFSOps fsOps, final TopologyBlobType type, LocalAssignment assignment) throws IOException { super(topologyId + " " + type.getFileName(), type.getKey(topologyId)); this.topologyId = topologyId; this.type = type; this.isLocalMode = isLocalMode; this.fsOps = fsOps; +this.owner = assignment.get_owner(); topologyBasicBlobsRootDir = Paths.get(ConfigUtils.supervisorStormDistRoot(conf, topologyId)); --- End diff -- Yes please. Let's decouple unnecessary things. ---
[GitHub] storm pull request #2809: STORM-3199: Remove metrics-ganglia due to LGPL dep...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2809#discussion_r212838254 --- Diff: pom.xml --- @@ -1011,12 +1005,12 @@ log4j log4j - -org.slf4j + --- End diff -- @srdo Looks like some nits on indentation. could you fix them? ---
[GitHub] storm pull request #2812: STORM-3203: add back in the permission updates
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2812#discussion_r212838182 --- Diff: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java --- @@ -53,21 +54,24 @@ private final boolean isLocalMode; private final Path topologyBasicBlobsRootDir; private final AdvancedFSOps fsOps; +private final String owner; private volatile long version = NOT_DOWNLOADED_VERSION; private volatile long size = 0; + /** * Create a new LocallyCachedBlob. - * - * @param topologyId the ID of the topology. + * @param topologyId the ID of the topology. * @param type the type of the blob. + * @param assignment the assignment, mostly to know who the owner is. */ protected LocallyCachedTopologyBlob(final String topologyId, final boolean isLocalMode, final Map conf, -final AdvancedFSOps fsOps, final TopologyBlobType type) throws IOException { +final AdvancedFSOps fsOps, final TopologyBlobType type, LocalAssignment assignment) throws IOException { super(topologyId + " " + type.getFileName(), type.getKey(topologyId)); this.topologyId = topologyId; this.type = type; this.isLocalMode = isLocalMode; this.fsOps = fsOps; +this.owner = assignment.get_owner(); topologyBasicBlobsRootDir = Paths.get(ConfigUtils.supervisorStormDistRoot(conf, topologyId)); --- End diff -- +1 on this, but not a big deal. Just curious on my side too. ---
[GitHub] storm pull request #2812: STORM-3203: add back in the permission updates
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2812#discussion_r212838119 --- Diff: storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java --- @@ -125,6 +129,12 @@ public long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, @Override public long fetchUnzipToTemp(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException { +synchronized (LocallyCachedTopologyBlob.class) { +if (!Files.exists(topologyBasicBlobsRootDir)) { --- End diff -- If my understanding is right, your point looks like only valid when there're multiple Supervisors running in a host with same base directory, and IMHO it is not the thing that we support. Are there other cases we need to handle with file lock? ---
[GitHub] storm issue #2811: STORM-3184: Replace the usage of redact-value with Config...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2811 Thanks @arunmahadevan , merged follow-up patch to 1.x-branch. ---
[GitHub] storm issue #2760: STORM-3123: Storm Kafka Monitor cannot connect to Kafka o...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2760 @VipinRathor Any updates? It is completely OK for you to just close an create a new PR. ---
[GitHub] storm issue #2807: STORM-3199: Remove metrics-ganglia due to LGPL dependency
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2807 @srdo I guess it worths to discuss in user/dev mailing list, cause I don't know who is using Ganglia Reporter. Actually I feel Metrics V2 may not be used actively, so IMHO the risk is small, but we may want to do some kinds of poll before removing anyway. If someone is using the Ganglia Reporter for 1.x, we can deprecate it for 1.x and remove in 2.0. ---
[GitHub] storm pull request #2808: [STORM-3131] Support hostname-substitution for blo...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2808#discussion_r211073836 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -1884,4 +1888,11 @@ public void setTopologyStrategy(String strategy) { this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, strategy); } +public static String getBlobstoreHDFSPrincipal(Map conf) throws UnknownHostException { +String principal = (String)conf.get(Config.BLOBSTORE_HDFS_PRINCIPAL); +if (principal != null) { +principal = principal.replace("HOSTNAME", Utils.localHostname()); --- End diff -- Hadoop picks `_HOST` as placeholder and it would have less change to be conflicted. So if we have no better reason to use alternative, using `_HOST` would be better to me. Also I would feel more safer if we could try to parse principal and only replace placeholder when host part in principal is just a placeholder. So `_HOST` in `yarn/_h...@example.com` can be substituted, but both `_HOST` in `yarn/_ho...@example.com` and `_HOST` in `_HOST/ho...@example.com` should not be substituted. @revans2 Could you provide some suggestions? ---
[GitHub] storm pull request #2803: STORM-2578: Apply new code style to storm-elastics...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2803#discussion_r211055229 --- Diff: external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsLookupBolt.java --- @@ -76,12 +78,12 @@ public void process(Tuple tuple) { } private Collection lookupValuesInEs(Tuple tuple) throws IOException { - String index = tupleMapper.getIndex(tuple); - String type = tupleMapper.getType(tuple); - String id = tupleMapper.getId(tuple); - Map params = tupleMapper.getParams(tuple, new HashMap<>()); +String index = tupleMapper.getIndex(tuple); +String type = tupleMapper.getType(tuple); +String id = tupleMapper.getId(tuple); +Map params = tupleMapper.getParams(tuple, new HashMap<>()); --- End diff -- It was broken indentation (mixed tab and space) and the change fixes the indentation. ---
[GitHub] storm pull request #2803: STORM-2578: Apply new code style to storm-elastics...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2803#discussion_r211055345 --- Diff: external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/doc/Index.java --- @@ -28,6 +29,13 @@ public Index() { } +/** + * Create a Index with the specified index, type and id. + * + * @param index index name + * @param type document type to be stored + * @param idunique document id in Elastisearch --- End diff -- nit: Elastisearch -> Elasticsearch ---
[GitHub] storm pull request #2803: STORM-2578: Apply new code style to storm-elastics...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2803#discussion_r211055054 --- Diff: external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsLookupResultOutput.java --- @@ -25,17 +26,22 @@ import org.elasticsearch.client.Response; /** - * @since 0.11 * The adapter to convert the results fetched from Elasticsearch to values. + * + * @since 0.11 */ public interface EsLookupResultOutput extends Serializable { /** + * Covert Elasticsearch response to a collection of {@link Values}. --- End diff -- nit: Covert -> Convert ---
[GitHub] storm issue #2803: STORM-2578: Apply new code style to storm-elasticsearch
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2803 @milantracy Thanks for the contribution! Could you please reduce the number of max violation count so that we can see how many spots your patch address, and also we never break it again? ---
[GitHub] storm issue #2760: STORM-3123: Storm Kafka Monitor cannot connect to Kafka o...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2760 @VipinRathor You can checkout a new branch based on recent master, and cherry-pick your commits, and reset your PR branch with the last commit of new branch via `git reset --hard`. ---
[GitHub] storm pull request #2801: STORM-3184: Mask the plaintext passwords from the ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2801#discussion_r209438801 --- Diff: storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java --- @@ -52,6 +79,16 @@ public static ConfigUtils setInstance(ConfigUtils u) { return oldInstance; } +public static Map maskPasswords(final Map conf) { +Maps.EntryTransformer maskPasswords = --- End diff -- @arunmahadevan I also think this approach can replace `Utils.radactValue` via adding `@password` to STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD. What do you think? Looks like we don't need to have both approach. ---
[GitHub] storm pull request #2801: STORM-3184: Mask the plaintext passwords from the ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2801#discussion_r209438732 --- Diff: storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java --- @@ -52,6 +79,16 @@ public static ConfigUtils setInstance(ConfigUtils u) { return oldInstance; } +public static Map maskPasswords(final Map conf) { +Maps.EntryTransformer maskPasswords = --- End diff -- @arunmahadevan Ah OK that makes sense. I should have made clear that my concern was adding guava to the dependency without shading. You can revert the logic with using shaded guava. https://github.com/apache/storm/blob/master/shaded-deps/pom.xml#L207-L218 Sorry about the confusion. ---
[GitHub] storm issue #2801: STORM-3184: Mask the plaintext passwords from the logs
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2801 @arunmahadevan Looks like we don't need to introduce Guava for master branch. I understand this is annoying to apply Java 7 / Java 8 for each branch, but let's keep this approach until we ship Storm 2.0.0. ---
[GitHub] storm pull request #2801: STORM-3184: Mask the plaintext passwords from the ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2801#discussion_r209424838 --- Diff: storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java --- @@ -52,6 +79,16 @@ public static ConfigUtils setInstance(ConfigUtils u) { return oldInstance; } +public static Map maskPasswords(final Map conf) { +Maps.EntryTransformer maskPasswords = --- End diff -- Could we replace this with Java streams API and remove Guava dependency? ---
[GitHub] storm issue #2798: STORM-3184: Mask the plaintext passwords from the logs
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2798 @arunmahadevan Totally agreed. We should move onto Storm 2.0.0, and actually most of recent pull requests have been only applied to master branch. ---
[GitHub] storm issue #2798: STORM-3184: Mask the plaintext passwords from the logs
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2798 ping @arunmahadevan ---
[GitHub] storm issue #2799: STORM-3183: Fix for visualization on Storm API
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2799 https://user-images.githubusercontent.com/1317309/43955215-21c167e0-9cda-11e8-818a-6bbf8b65ed5b.png;> Looks like there is still some missed spot: could you check with this? ---
[GitHub] storm issue #2794: STORM-3180 Total executors in Cluster Summary in main UI ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2794 @srdo No, unfortunately we don't apply consistent naming so `totalExecutors` is right for others. Renaming them would break clients which rely on the REST API, so I'm not sure we would be better to fix them though I think it is ideal to fix. ---
[GitHub] storm pull request #2794: STORM-3180 Total executors in Cluster Summary in m...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/storm/pull/2794 STORM-3180 Total executors in Cluster Summary in main UI page is not ⦠â¦exposed even a topology is running * rename the field of /cluster/summary output: `totalExecutors` to `executorsTotal` Please compare with doc regarding the output format of API: https://github.com/apache/storm/blob/master/docs/STORM-UI-REST-API.md#apiv1clustersummary-get You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/storm STORM-3180 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2794.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2794 commit fdfeb14166301fed82b045f41934b693e95577c4 Author: Jungtaek Lim Date: 2018-08-06T13:07:53Z STORM-3180 Total executors in Cluster Summary in main UI page is not exposed even a topology is running * rename the field of /cluster/summary output: totalExecutors to executorsTotal ---
[GitHub] storm issue #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2752 Sorry to revisit this in proper time. Thanks @govind-menon for doing this great work, and thanks @revans2 for the detailed review. I'll try to spend time to play with UI and report issues if any. ---
[GitHub] storm issue #2783: [WIP] Make StormMetricsRegistry a regular class rather th...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2783 I love all the approaches to switch static to non-static if it doesn't require any hack or long parameters to inject, and this looks great. +1 to move on. ---
[GitHub] storm issue #2773: Blobstore sync bug fix
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2773 @jiangzhileaf Any updates? ---
[GitHub] storm issue #2775: MINOR - Make raw type assignment type safe
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2775 Thanks @hmcl for providing the patch. Merged into master. ---
[GitHub] storm pull request #2773: Blobstore sync bug fix
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2773#discussion_r205973135 --- Diff: storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java --- @@ -197,7 +198,7 @@ public void run() { throw new RuntimeException(e); } } -}, 0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CODE_SYNC_FREQ_SECS))); +}, 0, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CODE_SYNC_FREQ_SECS))*1000); --- End diff -- Nice catch! ---
[GitHub] storm pull request #2773: Blobstore sync bug fix
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2773#discussion_r205973204 --- Diff: storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java --- @@ -191,6 +192,8 @@ public static boolean downloadUpdatedBlob(Map conf, BlobStore bl out.close(); } isSuccess = true; +} catch(FileNotFoundException fnf) { --- End diff -- I'd rather leave it as it is, unless we feel confident that it is OK to swallow here. ---
[GitHub] storm issue #2778: (1.x) STORM-3121: Fix flaky metrics tests in storm-core
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2778 @srdo Thanks for the review. Since this is a patch for backport I'll skip waiting 24hrs for this. ---
[GitHub] storm pull request #2778: (1.x) STORM-3121: Fix flaky metrics tests in storm...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2778#discussion_r205965687 --- Diff: storm-core/test/resources/log4j2-test.xml --- @@ -25,7 +25,8 @@ - + --- End diff -- @srdo I think we need to use origin package name. I missed here. Fixed. ---
[GitHub] storm issue #2778: (1.x) STORM-3121: Fix flaky metrics tests in storm-core
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2778 https://travis-ci.org/HeartSaVioR/storm/jobs/409469987 (passed) https://travis-ci.org/HeartSaVioR/storm/jobs/409469988 (passed) https://travis-ci.org/apache/storm/jobs/409470450 (integration.org.apache.storm.integration-test failed) https://travis-ci.org/apache/storm/jobs/409470451 (integration.org.apache.storm.integration-test failed) ---
[GitHub] storm pull request #2778: (1.x) STORM-3121: Fix flaky metrics tests in storm...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2778#discussion_r205964723 --- Diff: storm-core/test/clj/org/apache/storm/metrics_test.clj --- @@ -15,16 +15,14 @@ ;; limitations under the License. (ns org.apache.storm.metrics-test (:use [clojure test]) - (:import [org.apache.storm.topology TopologyBuilder]) - (:import [org.apache.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus]) - (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount -TestAggregatesCounter TestConfBolt AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout]) - (:import [org.apache.storm.task ShellBolt]) - (:import [org.apache.storm.spout ShellSpout]) - (:import [org.apache.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo]) - (:import [org.apache.storm.metric.api.rpc CountShellMetric]) - (:import [org.apache.storm.utils Utils]) - + (:import [org.apache.storm.testing AckFailMapTracker PythonShellMetricsBolt PythonShellMetricsSpout]) --- End diff -- Most of changes from import are removing unused import. ---
[GitHub] storm pull request #2778: (1.x) STORM-3121: Fix flaky metrics tests in storm...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/storm/pull/2778 (1.x) STORM-3121: Fix flaky metrics tests in storm-core This ports back #2735 to 1.x-branch since I can see flaky metrics tests from 1.x-branch. https://travis-ci.org/apache/storm/jobs/408847970#L1079-L1082 I also fixed merge conflict so would like to review rather than pushing the change without notice. @srdo Could you take a look? Thanks in advance! You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/storm STORM-3121-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2778.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2778 commit 50ea2149776adecdb270e81c60ab5192fbd50e79 Author: Stig Rohde Døssing Date: 2018-06-24T10:55:11Z STORM-3121: Fix flaky metrics tests in storm-core ---
[GitHub] storm issue #2711: STORM-3100 : Introducing CustomIndexArray to speedup Hash...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2711 Looks like the build still fails and relevant to this patch. ---
[GitHub] storm issue #2776: STORM-3161 Local mode should force setting min replicatio...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2776 @danny0405 It is not an error but stuck with no update. https://github.com/apache/storm/blob/3b5f9e7c75b378881060f454d39e6bf653e0bcb3/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj#L502 In local mode we always launch only one nimbus, hence never achieve replication if min replication count is set to higher than 1. ---
[GitHub] storm pull request #2777: (1.x) STORM-3161 Local mode should force setting m...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/storm/pull/2777 (1.x) STORM-3161 Local mode should force setting min replication count to 1 When topology.min.replication.count is set to more than 1, nimbus in local mode never achieve condition for replication, hence stuck on handling blobs. We should force set it to 1 in local mode to avoid this situation. Also set storm.home to new temporary directory to avoid using actual log directory. Patch for master branch: #2776 You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/storm STORM-3161-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2777.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2777 commit f1e9a7941d6c90ab687a49eee49eb272e771d94b Author: Jungtaek Lim Date: 2018-07-27T08:31:29Z STORM-3161 Local mode should force setting min replication count to 1 When topology.min.replication.count is set to more than 1, nimbus in local mode never achieve condition for replication, hence stuck on handling blobs. We should force set it to 1 in local mode to avoid this situation. Also set storm.home to new temporary directory to avoid using actual log directory. ---
[GitHub] storm pull request #2776: STORM-3161 Local mode should force setting min rep...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/storm/pull/2776 STORM-3161 Local mode should force setting min replication count to 1 When topology.min.replication.count is set to more than 1, nimbus in local mode never achieve condition for replication, hence stuck on handling blobs. We should force set it to 1 in local mode to avoid this situation. Will also submit patch for 1.x-branch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/storm STORM-3161 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2776.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2776 commit 882cfff9f8249248ff09b5898389523512ede40b Author: Jungtaek Lim Date: 2018-07-27T07:58:37Z STORM-3161 Local mode should force setting min replication count to 1 When topology.min.replication.count is set to more than 1, nimbus in local mode never achieve condition for replication, hence stuck on handling blobs. We should force set it to 1 in local mode to avoid this situation. ---
[GitHub] storm issue #2768: STORM-3156: Remove the transactional topology API
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2768 @revans2 Could you review this? I'm softly +1 but assuming there's no usage on transactional API. ---
[GitHub] storm issue #2761: STORM-2947: Remove deprecated field and method from Thrif...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2761 @revans2 Could you revisit this PR? ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204259473 --- Diff: storm-core/src/clj/org/apache/storm/ui/helpers.clj --- @@ -56,7 +56,7 @@ ;; TODO this function and its callings will be replace when ui.core and logviewer move to Java (defnk json-response - [data callback :need-serialize true :status 200 :headers {}] - {:status status + [data callback :need-serialize true :status 200 :headers {}] --- End diff -- Same here. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204276809 --- Diff: storm-core/src/jvm/org/apache/storm/ui/filters/AuthorizedUserFilter.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.ui.filters; + +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.security.Principal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ResourceInfo; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.PathSegment; +import javax.ws.rs.ext.Provider; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.security.auth.IAuthorizer; +import org.apache.storm.security.auth.ReqContext; +import org.apache.storm.thrift.TException; +import org.apache.storm.ui.resources.NimbusOp; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Provider +public class AuthorizedUserFilter implements ContainerRequestFilter { + +public static final Logger LOG = LoggerFactory.getLogger(AuthorizedUserFilter.class); +public static Map conf = Utils.readStormConfig(); +public static IAuthorizer uiImpersonationHandler; +public static IAuthorizer uiAclHandler; + +@Context private ResourceInfo resourceInfo; + +static { +try { +uiImpersonationHandler = StormCommon.mkAuthorizationHandler( +(String) conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf); +uiAclHandler = StormCommon.mkAuthorizationHandler( +(String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf); +} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { +LOG.error("Error initializing AuthorizedUserFilter: ", e); +throw new RuntimeException(e); +} +} + +@Override +public void filter(ContainerRequestContext containerRequestContext) throws IOException { +NimbusOp annotation = resourceInfo.getResourceMethod().getAnnotation(NimbusOp.class); +if (annotation == null) { +return; +} +String op = annotation.value(); +if (op == null) { +return; +} + +Map topoConf = null; +if (containerRequestContext.getUriInfo().getPathParameters().containsKey("id")) { +NimbusClient nimbusClient = NimbusClient.getConfiguredClient(Utils.readStormConfig()); +try { +topoConf = (Map) JSONValue.parse(nimbusClient.getClient().getTopologyConf( + containerRequestContext.getUriInfo().getPathParameters().get("id").get(0))); +} catch (TException e) { +e.printStackTrace(); +} +} + +ReqContext reqContext = ReqContext.context(); + +if (reqContext.isImpersonating()) { +if (uiImpersonationHandler != null) { +if (!uiImpersonationHandler.permit(reqContext, op, topoConf)) { +Principal realPrincipal = reqContext.realPrincipal(); +Principal principal = reqContext.principal(); +String user = "unknown"; +if (principal != n
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204258860 --- Diff: external/storm-opentsdb/pom.xml --- @@ -36,7 +36,7 @@ -2.23 +2.27 --- End diff -- If we would want to sync the jersey version to root pom, this property would not be needed. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204260307 --- Diff: storm-dist/binary/storm-webapp-bin/src/main/assembly/storm-webapp.xml --- @@ -27,6 +27,18 @@ false lib-webapp false + --- End diff -- Once we also move UI to webapp this will be no longer needed at all. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204262254 --- Diff: storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java --- @@ -1,12 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) --- End diff -- Is it for fixing checkstyle, or unneeded change? ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204259623 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java --- @@ -28,10 +28,14 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.container.PreMatching; +import javax.ws.rs.ext.Provider; import org.apache.storm.security.auth.IHttpCredentialsPlugin; import org.apache.storm.security.auth.ReqContext; +@Provider +@PreMatching public class ReqContextFilter implements Filter { --- End diff -- Once UI also leverages this class, I feel we would be better to have common package on webservice, and move this class to there. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204262044 --- Diff: storm-core/src/jvm/org/apache/storm/ui/filters/HeaderResponseServletFilter.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.ui.filters; + +import com.codahale.metrics.Meter; +import org.apache.storm.Constants; +import org.apache.storm.metric.StormMetricsRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; + +public class HeaderResponseServletFilter implements Filter { +public static final Logger LOG = LoggerFactory.getLogger(HeaderResponseServletFilter.class); + +public static Meter webRequestMeter = +StormMetricsRegistry.registerMeter("num-web-requests"); + +public static Meter mainPageRequestMeter = + StormMetricsRegistry.registerMeter("ui:num-main-page-http-requests"); +@Override +public void init(FilterConfig filterConfig) throws ServletException { + +} + +@Override +public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { +webRequestMeter.mark(); +HttpServletRequest httpRequest = (HttpServletRequest) servletRequest; +HttpServletResponse httpResponse = (HttpServletResponse) servletResponse; +if ((httpRequest.getPathInfo()).equals("/index.html")) { --- End diff -- It looks like porting `request-middleware` to filter and the conditions between `request-middleware` and here are not same. https://github.com/apache/storm/blob/a7e817bcd1424d300ab5bad06c5d9f4729d9f347/storm-core/src/clj/org/apache/storm/ui/helpers.clj#L39-L55 ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204259460 --- Diff: storm-core/src/clj/org/apache/storm/ui/core.clj --- @@ -1613,24 +1613,24 @@ https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH) https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)] (StormMetricsRegistry/startMetricsReporters conf) - (UIHelpers/stormRunJetty (int (conf UI-PORT)) -(conf UI-HOST) -https-port -header-buffer-size -(reify IConfigurator + (UIHelpers/stormRunJetty (int (conf UI-PORT)) + (conf UI-HOST) + https-port + header-buffer-size + (reify IConfigurator (execute [this server] (UIHelpers/configSsl server - https-port - https-ks-path - https-ks-password - https-ks-type - https-key-password - https-ts-path - https-ts-password - https-ts-type - https-need-client-auth - https-want-client-auth - header-buffer-size) + https-port + https-ks-path + https-ks-password + https-ks-type + https-key-password + https-ts-path + https-ts-password + https-ts-type + https-need-client-auth + https-want-client-auth + header-buffer-size) --- End diff -- I think we will remove clj files later, but let's revert unneeded changes so that we can see smaller set of change. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204259546 --- Diff: storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java --- @@ -34,6 +34,11 @@ private static final Logger LOG = LoggerFactory.getLogger(UploadCredentials.class); +/** + * Uploads credentials for a topology. + * @param args To accept topology name. + * @throws Exception Ignored. --- End diff -- Same here. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204259528 --- Diff: storm-core/src/jvm/org/apache/storm/command/GetErrors.java --- @@ -25,6 +25,11 @@ import org.json.simple.JSONValue; public class GetErrors { +/** + * Only get errors for a topology. + * @param args Used to accept the topology name. + * @throws Exception Ignored. --- End diff -- rewrite this to `@throws Exception on errors` on consistency with BasicDrpcClient, or just remove `throws Exception`. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204258951 --- Diff: storm-client/src/jvm/org/apache/storm/Constants.java --- @@ -55,5 +55,7 @@ public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb"; public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb"; public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb"; + +public static final String STORM_API_URL_PREFIX = "/api/v1/"; --- End diff -- I can only see the usage from UIServer (storm-core, ideally needs to be moved to webserver module), and I'm not sure client side needs to know about this. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204259185 --- Diff: storm-core/pom.xml --- @@ -32,6 +32,9 @@ /etc/storm ${project.build.directory}/native/worker-launcher +2.27 +2.4.3 +1.19.4 --- End diff -- I'd like to see this placed in web-app or individual module before merging this in. Once we pull this in and release, follow-up issues would be easy to forget. And our final goal for breaking down `storm-core` is to remove `storm-core`, so moving this to web-app also helps on reducing the work on this. ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204258845 --- Diff: pom.xml --- @@ -329,7 +329,7 @@ 3.1.0 1.0 0.13.1 -2.24.1 +2.27 --- End diff -- Could you elaborate why we upgrade the version here? ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204276473 --- Diff: storm-core/src/jvm/org/apache/storm/ui/filters/AuthorizedUserFilter.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.ui.filters; + +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.security.Principal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ResourceInfo; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.PathSegment; +import javax.ws.rs.ext.Provider; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.security.auth.IAuthorizer; +import org.apache.storm.security.auth.ReqContext; +import org.apache.storm.thrift.TException; +import org.apache.storm.ui.resources.NimbusOp; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Provider +public class AuthorizedUserFilter implements ContainerRequestFilter { + +public static final Logger LOG = LoggerFactory.getLogger(AuthorizedUserFilter.class); +public static Map conf = Utils.readStormConfig(); +public static IAuthorizer uiImpersonationHandler; +public static IAuthorizer uiAclHandler; + +@Context private ResourceInfo resourceInfo; + +static { +try { +uiImpersonationHandler = StormCommon.mkAuthorizationHandler( +(String) conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf); +uiAclHandler = StormCommon.mkAuthorizationHandler( +(String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf); +} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { +LOG.error("Error initializing AuthorizedUserFilter: ", e); +throw new RuntimeException(e); +} +} + +@Override +public void filter(ContainerRequestContext containerRequestContext) throws IOException { +NimbusOp annotation = resourceInfo.getResourceMethod().getAnnotation(NimbusOp.class); +if (annotation == null) { +return; +} +String op = annotation.value(); +if (op == null) { +return; +} + +Map topoConf = null; +if (containerRequestContext.getUriInfo().getPathParameters().containsKey("id")) { +NimbusClient nimbusClient = NimbusClient.getConfiguredClient(Utils.readStormConfig()); +try { +topoConf = (Map) JSONValue.parse(nimbusClient.getClient().getTopologyConf( + containerRequestContext.getUriInfo().getPathParameters().get("id").get(0))); +} catch (TException e) { +e.printStackTrace(); +} +} + +ReqContext reqContext = ReqContext.context(); + +if (reqContext.isImpersonating()) { +if (uiImpersonationHandler != null) { +if (!uiImpersonationHandler.permit(reqContext, op, topoConf)) { +Principal realPrincipal = reqContext.realPrincipal(); +Principal principal = reqContext.principal(); +String user = "unknown"; +if (principal != n
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204260283 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java --- @@ -37,7 +37,6 @@ import static java.util.stream.Collectors.toList; import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; -import j2html.TagCreator; --- End diff -- Are these for removing unused import? ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204260022 --- Diff: storm-core/src/jvm/org/apache/storm/utils/Monitor.java --- @@ -1,12 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) --- End diff -- Could you explain why this change is necessary? ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204276964 --- Diff: storm-core/src/jvm/org/apache/storm/ui/exceptionmappers/NotAliveExceptionMapper.java --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.ui.exceptionmappers; + +import org.apache.storm.generated.NotAliveException; + +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +@Provider +public class NotAliveExceptionMapper implements ExceptionMapper { +public Response toResponse(NotAliveException ex) { +return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); --- End diff -- I'd rather not accept unless the response is at least similar detail on current. (This also means current response is not a good format IMHO, and I don't want to make it worse.) ---
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204276808 --- Diff: storm-core/src/jvm/org/apache/storm/ui/filters/AuthorizedUserFilter.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.ui.filters; + +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.security.Principal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ResourceInfo; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.PathSegment; +import javax.ws.rs.ext.Provider; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.security.auth.IAuthorizer; +import org.apache.storm.security.auth.ReqContext; +import org.apache.storm.thrift.TException; +import org.apache.storm.ui.resources.NimbusOp; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Provider +public class AuthorizedUserFilter implements ContainerRequestFilter { + +public static final Logger LOG = LoggerFactory.getLogger(AuthorizedUserFilter.class); +public static Map conf = Utils.readStormConfig(); +public static IAuthorizer uiImpersonationHandler; +public static IAuthorizer uiAclHandler; + +@Context private ResourceInfo resourceInfo; + +static { +try { +uiImpersonationHandler = StormCommon.mkAuthorizationHandler( +(String) conf.get(DaemonConfig.NIMBUS_IMPERSONATION_AUTHORIZER), conf); +uiAclHandler = StormCommon.mkAuthorizationHandler( +(String) conf.get(DaemonConfig.NIMBUS_AUTHORIZER), conf); +} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { +LOG.error("Error initializing AuthorizedUserFilter: ", e); +throw new RuntimeException(e); +} +} + +@Override +public void filter(ContainerRequestContext containerRequestContext) throws IOException { +NimbusOp annotation = resourceInfo.getResourceMethod().getAnnotation(NimbusOp.class); +if (annotation == null) { +return; +} +String op = annotation.value(); +if (op == null) { +return; +} + +Map topoConf = null; +if (containerRequestContext.getUriInfo().getPathParameters().containsKey("id")) { +NimbusClient nimbusClient = NimbusClient.getConfiguredClient(Utils.readStormConfig()); +try { +topoConf = (Map) JSONValue.parse(nimbusClient.getClient().getTopologyConf( + containerRequestContext.getUriInfo().getPathParameters().get("id").get(0))); +} catch (TException e) { +e.printStackTrace(); +} +} + +ReqContext reqContext = ReqContext.context(); + +if (reqContext.isImpersonating()) { +if (uiImpersonationHandler != null) { +if (!uiImpersonationHandler.permit(reqContext, op, topoConf)) { +Principal realPrincipal = reqContext.realPrincipal(); +Principal principal = reqContext.principal(); +String user = "unknown"; +if (principal != n
[GitHub] storm pull request #2752: Storm 1311 Migration of UI from clj to Java
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r204259143 --- Diff: storm-core/pom.xml --- @@ -32,6 +32,9 @@ /etc/storm ${project.build.directory}/native/worker-launcher +2.27 --- End diff -- Can we rely on root pom version of jersey? ---
[GitHub] storm issue #2768: STORM-3156: Remove the transactional topology API
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2768 I don't see any use case on using transactional API (even when I start contribution on Apache Storm) so completely OK to remove, but feel safer to ask @revans2 to verify Bobby hasn't have use case on transactional API once more... ---
[GitHub] storm pull request #2766: STORM-2972: Replace storm-kafka with storm-kafka-c...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2766#discussion_r20357 --- Diff: docs/storm-sql-internal.md --- @@ -18,18 +18,16 @@ Figure 1 describes the workflow of executing a SQL query in StormSQL. First, use Figure 1: Workflow of StormSQL. -Note: Trident Topology is now replaced with normal Storm topology leveraging Streams API. --- End diff -- Yeah I forgot to remove the line after renewing diagrams. Nice finding. ---
[GitHub] storm pull request #2766: STORM-2972: Replace storm-kafka with storm-kafka-c...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2766#discussion_r203588814 --- Diff: docs/storm-sql-example.md --- @@ -291,9 +291,4 @@ That's it! Supposing we have UDF which queries geo location via remote ip, we ca ## Summary -We looked through several simple use cases for Storm SQL to learn Storm SQL features. If you haven't looked at [Storm SQL integration](storm-sql.html) and [Storm SQL language](storm-sql-reference.html), you need to read it to see full supported features. - -Note that Storm SQL is running on Trident, which is micro-batch, and also no strong typed. Sink doesn't actually check the type. --- End diff -- Nice finding! Thanks for handling this. ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 @srdo Thanks for the detailed review and nice suggestions! ---
[GitHub] storm issue #2711: STORM-3100 : Introducing CustomIndexArray to speedup Hash...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2711 @roshannaik https://travis-ci.org/apache/storm/jobs/403972342 https://travis-ci.org/apache/storm/jobs/403972347 https://travis-ci.org/apache/storm/jobs/403972348 https://travis-ci.org/apache/storm/jobs/403972349 Travis CI build is failing and failures look like related to the changeset. Could you check above build result and track? ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 @srdo I'd like to get final review and approval from you to check if I miss anything before merging. Could you please help to do this? Thanks! ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 @srdo Nice finding. Fixed. Also exported to xml once again. Please check again. Thanks for your patience! [storm-sql-internal-example.xml.txt](https://github.com/apache/storm/files/2198168/storm-sql-internal-example.xml.txt) ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 @srdo I have exported these diagrams into xml : please download and rename below files to remove `.txt` (github doesn't allow uploading xml file directly) and see they're properly imported in your account of draw.io. [storm-sql-internal-example.xml.txt](https://github.com/apache/storm/files/2198021/storm-sql-internal-example.xml.txt) [storm-sql-internal-workflow.xml.txt](https://github.com/apache/storm/files/2198022/storm-sql-internal-workflow.xml.txt) ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 Raised https://issues.apache.org/jira/browse/STORM-3153 for addressing restoring tests. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202684941 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- Nice finding. Will fix it and also will attach source file of draw.io, but not sure where to add. I'll draw for second diagram as well. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202678116 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- Just finished it. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202674112 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- Yeah right. I didn't think about that. Drawing a new one. ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 Yes test failures look unrelated, one is from server, another one is from cassandra. I'd like to address https://github.com/apache/storm/pull/2443#discussion_r202639091 but OK to address it with new issue, since smaller patch would be easier to merge. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202644352 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- We could even do some photo editing but not sure it is easy to do since I don't know which font is used for `storm-sql-internal-workflow.png`. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202639091 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +75,12 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { -ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( +ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); -ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); - -Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); -Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - -HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); -StateUpdater stateUpdater = consumer.getStateUpdater(); - -HdfsFileOptions options = mock(HdfsFileOptions.class); -Field optionsField = state.getClass().getDeclaredField("options"); -optionsField.setAccessible(true); -optionsField.set(state, options); +IRichBolt consumer = ds.getConsumer(); -List tupleList = mockTupleList(); - -for (TridentTuple t : tupleList) { -stateUpdater.updateState(state, Collections.singletonList(t), null); -try { -verify(options).execute(Collections.singletonList(t)); -} catch (IOException e) { -throw new RuntimeException(e); -} -} -} - -private static List mockTupleList() { -List tupleList = new ArrayList<>(); -TridentTuple t0 = mock(TridentTuple.class); -TridentTuple t1 = mock(TridentTuple.class); -doReturn(1).when(t0).get(0); -doReturn(2).when(t1).get(0); -doReturn(Lists.newArrayList(1, "2")).when(t0).getValues(); -doReturn(Lists.newArrayList(2, "3")).when(t1).getValues(); -tupleList.add(t0); -tupleList.add(t1); -return tupleList; +Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- Thanks for the information! I'll think about applying it but let me address other comments first. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202635190 --- Diff: sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java --- @@ -47,54 +45,60 @@ * The properties are in JSON format which specifies the name of the MongoDB collection and etc. */ public class MongoDataSourcesProvider implements DataSourcesProvider { +public static final String SCHEME_NAME = "mongodb"; +public static final String VALUE_SERIALIZED_FIELD = "ser.field"; +public static final String TRIDENT_VALUE_SERIALIZED_FIELD = "trident.ser.field"; +public static final String DEFAULT_VALUE_SERIALIZED_FIELD = "tridentSerField"; +public static final String COLLECTION_NAME = "collection.name"; -private static class MongoTridentDataSource implements ISqlTridentDataSource { +private static class MongoStreamsDataSource implements ISqlStreamsDataSource { private final String url; private final Properties props; private final IOutputSerializer serializer; -private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) { +private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) { this.url = url; this.props = props; this.serializer = serializer; } @Override -public ITridentDataSource getProducer() { +public IRichSpout getProducer() { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); } @Override -public SqlTridentConsumer getConsumer() { +public IRichBolt getConsumer() { Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config"); -String serField = props.getProperty("trident.ser.field", "tridentSerField"); -MongoMapper mapper = new TridentMongoMapper(serField, serializer); - -MongoState.Options options = new MongoState.Options() -.withUrl(url) - .withCollectionName(props.getProperty("collection.name")) -.withMapper(mapper); - -StateFactory stateFactory = new MongoStateFactory(options); -StateUpdater stateUpdater = new MongoStateUpdater(); - -return new SimpleSqlTridentConsumer(stateFactory, stateUpdater); +String serField; +if (props.contains(VALUE_SERIALIZED_FIELD)) { +serField = props.getProperty(VALUE_SERIALIZED_FIELD); +} else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) { +// backward compatibility +serField = props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD); --- End diff -- Yeah right, but it has been existing one and easy to support backward compatibility, so doesn't matter if we support it. We didn't deprecate it either. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202631700 --- Diff: sql/README.md --- @@ -1,187 +1,8 @@ # Storm SQL -Compile SQL queries to Storm topologies. +Compile SQL queries to Storm topologies and run. -## Usage - -Run the ``storm sql`` command to compile SQL statements into Trident topology, and submit it to the Storm cluster - -``` -$ bin/storm sql -``` - -In which `sql-file` contains a list of SQL statements to be executed, and `topo-name` is the name of the topology. - -StormSQL activates `explain mode` and shows query plan instead of submitting topology when user specifies `topo-name` as `--explain`. -Detailed explanation is available from `Showing Query Plan (explain mode)` section. - -## Supported Features - -The following features are supported in the current repository: - -* Streaming from and to external data sources -* Filtering tuples -* Projections -* Aggregations (Grouping) -* User defined function (scalar and aggregate) -* Join (Inner, Left outer, Right outer, Full outer) - -## Specifying External Data Sources - -In StormSQL data is represented by external tables. Users can specify data sources using the `CREATE EXTERNAL TABLE` -statement. For example, the following statement specifies a Kafka spouts and sink: - -``` -CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' -``` - -The syntax of `CREATE EXTERNAL TABLE` closely follows the one defined in -[Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). - -`PARALLELISM` is StormSQL's own keyword which describes parallelism hint for input data source. This is same as providing parallelism hint to Trident Spout. -Downstream operators are executed with same parallelism before repartition (Aggregation triggers repartition). - -Default value is 1, and this option has no effect on output data source. (We might change if needed. Normally repartition is the thing to avoid.) - -## Plugging in External Data Sources - -Users plug in external data sources through implementing the `ISqlTridentDataSource` interface and registers them using -the mechanisms of Java's service loader. The external data source will be chosen based on the scheme of the URI of the -tables. Please refer to the implementation of `storm-sql-kafka` for more details. - -## Specifying User Defined Function (UDF) - -Users can define user defined function (scalar or aggregate) using `CREATE FUNCTION` statement. -For example, the following statement defines `MYPLUS` function which uses `org.apache.storm.sql.TestUtils$MyPlus` class. - -``` -CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' -``` - -Storm SQL determines whether the function as scalar or aggregate by checking which methods are defined. -If the class defines `evaluate` method, Storm SQL treats the function as `scalar`, -and if the class defines `add` method, Storm SQL treats the function as `aggregate`. - -Example of class for scalar function is here: - -``` - public class MyPlus { -public static Integer evaluate(Integer x, Integer y) { - return x + y; -} - } - -``` - -and class for aggregate function is here: - -``` - public class MyConcat { -public static String init() { - return ""; -} -public static String add(String accumulator, String val) { - return accumulator + val; -} -public static String result(String accumulator) { - return accumulator; -} - } -``` - -If users don't define `result` method, result is the last return value of `add` method. -Users need to define `result` method only when we need to transform accumulated value. - -## Example: Filtering Kafka Stream - -Let's say there is a Kafka stream that represents the transactions of orders. Each message in the stream contains the id -of the order, the unit price of the product and the quantity of the orders. The goal is to filter orders where the -transactions are significant and to insert these orders into another Kafka stream for further analysis. - -The user can specify the following SQL statements in the SQL file: - -``` -CREATE EXTERNAL TABLE ORDERS (ID I
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202632910 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/StreamsStormRuleSets.java --- @@ -72,13 +72,13 @@ // merge and push unions rules UnionEliminatorRule.INSTANCE, -TridentScanRule.INSTANCE, -TridentFilterRule.INSTANCE, -TridentProjectRule.INSTANCE, -TridentAggregateRule.INSTANCE, -TridentJoinRule.INSTANCE, -TridentModifyRule.INSTANCE, -TridentCalcRule.INSTANCE +StreamsScanRule.INSTANCE, +StreamsFilterRule.INSTANCE, +StreamsProjectRule.INSTANCE, +StreamsAggregateRule.INSTANCE, --- End diff -- This is to have control to warn to the end users when aggregate or join is used in users' query. I didn't test if we don't have our own rule and user query trigger the feature. Yeah it is also good to not forget about missing features. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202638476 --- Diff: sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java --- @@ -41,7 +26,46 @@ import java.util.Map; import java.util.PriorityQueue; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.sql.runtime.ISqlStreamsDataSource; +import org.apache.storm.streams.Pair; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.junit.rules.ExternalResource; + public class TestUtils { + public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() { +@Override +protected void before() throws Throwable { + MockInsertBolt.getCollectedValues().clear(); +} + +@Override +protected void after() { + // no-op --- End diff -- Will fix. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202638239 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. --- End diff -- You could do some experiments with your production query via replacing input table and/or output table to socket. We may also want to have console output table (it can't be input table for sure) but we could address it from follow-up issue. Please refer others' cases: Spark has couple of input sources and output sinks for testing, whereas Flink doesn't look like providing table sources and table sinks for testing. I'd rather feel better on Spark for this case. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202633872 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/planner/streams/rel/StreamsStreamInsertRel.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.planner.streams.rel; + +import java.util.List; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.storm.sql.planner.StormRelUtils; +import org.apache.storm.sql.planner.rel.StormStreamInsertRelBase; +import org.apache.storm.sql.planner.streams.StreamsPlanCreator; +import org.apache.storm.sql.runtime.streams.functions.StreamInsertMapToPairFunction; +import org.apache.storm.streams.Stream; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Values; + +public class StreamsStreamInsertRel extends StormStreamInsertRelBase implements StreamsRel { +private final int primaryKeyIndex; + +public StreamsStreamInsertRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, Prepare.CatalogReader catalogReader, + RelNode child, Operation operation, List updateColumnList, List sourceExpressionList, + boolean flattened, int primaryKeyIndex) { +super(cluster, traits, table, catalogReader, child, operation, updateColumnList, sourceExpressionList, flattened); +this.primaryKeyIndex = primaryKeyIndex; +} + +@Override +public RelNode copy(RelTraitSet traitSet, List inputs) { +return new StreamsStreamInsertRel(getCluster(), traitSet, getTable(), getCatalogReader(), +sole(inputs), getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), primaryKeyIndex); +} + +@Override +public void streamsPlan(StreamsPlanCreator planCreator) throws Exception { +// SingleRel +RelNode input = getInput(); +StormRelUtils.getStormRelInput(input).streamsPlan(planCreator); +Stream inputStream = planCreator.pop(); + +Preconditions.checkArgument(isInsert(), "Only INSERT statement is supported."); + +// Calcite ensures that the value is structurized to the table definition +// hence we can use PK index directly +// To elaborate, if table BAR is defined as ID INTEGER PK, NAME VARCHAR, DEPTID INTEGER +// and query like INSERT INTO BAR SELECT NAME, ID FROM FOO is executed, +// Calcite makes the projection ($1 <- ID, $0 <- NAME, null) to the value before INSERT. + +// FIXME: this should be really different... --- End diff -- Removing the line would be right for now... since sadly I forgot why I add FIXME line here. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202631939 --- Diff: sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java --- @@ -118,16 +138,32 @@ public RelDataType getRowType( @Override public Statistic getStatistic() { return stat != null ? stat : Statistics.of(rows.size(), - ImmutableList.of()); +ImmutableList.of()); } @Override public Schema.TableType getJdbcTableType() { return Schema.TableType.STREAM; } + +@Override +public boolean isRolledUp(String s) { --- End diff -- OK will find origin methods and restore its origin names. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202638350 --- Diff: sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/bolt/SocketBolt.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.storm.sql.runtime.datasource.socket.bolt; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.util.Map; + +import org.apache.storm.shade.org.apache.commons.io.IOUtils; +import org.apache.storm.sql.runtime.IOutputSerializer; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Bolt implementation for Socket. Only available for Storm SQL. + * The class doesn't handle reconnection so you may not want to use this for production. + */ +public class SocketBolt implements IRichBolt { --- End diff -- Yeah nice finding! Will fix. ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 @srdo Yes, since Streams API doesn't have rich function like executor side initialization for now. We could address it but maybe better to file a new issue and handle it independently. What do you think? ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202631502 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- @srdo Agreed. Regarding updating the doc, I guess most parts of content are correct, but unfortunately I don't have resource for diagram so required to redraw one. I'd rather explain in content to avoid redraw one (like "Now Trident topology in a diagram is replaced to normal Storm topology"). What do you think? ---
[GitHub] storm pull request #2744: [STORM-3132] Avoid NPE in the Values Constructor
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2744#discussion_r202568702 --- Diff: storm-client/test/jvm/org/apache/storm/tuple/ValuesTest.java --- @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.tuple; + +import org.junit.Assert; +import org.junit.Test; + +public class ValuesTest { + +@Test +public void testNoArgsToValues() { +Values vals = new Values(); +Assert.assertTrue("Failed to add null to Values", vals.size() == 0); +} + +@Test +public void testNullArgsToValues() { +Values vals = new Values(null); +Assert.assertTrue("Failed to add null to Values", vals.size() == 1); --- End diff -- I'm sorry to comment this too late, but let's make sure checking elements in values. Same applies to other tests. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202345723 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,24 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { +//Just in case we get something we are confused about +// we can continue processing the rest of the tasks +LOG.error("BP index out of bounds {}", e); --- End diff -- Ah! I realized `e` is bound to {} so my suggestion is already applied (show index within same line), but no stack trace. ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202343054 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { --- End diff -- OK. Makes sense. I know message of exception will contain the index which will be included to log message, but explicitly write in log message would help users to find the index while grepping since it can be found within same line. ---
[GitHub] storm issue #2590: STORM-2974: Add transactional non-opaque spout to storm-k...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2590 I have found that couple of public classes are renamed, but they're in `internal` package which makes others feeling non-public, so I think we are OK. ---
[GitHub] storm pull request #2443: STORM-2406 [Storm SQL] Change underlying API to St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202282820 --- Diff: docs/storm-sql-internal.md --- @@ -1,59 +0,0 @@ --- End diff -- I removed the document since it is documented based on Trident implementation. ---
[GitHub] storm issue #2443: STORM-2406 [Storm SQL] Change underlying API to Streams A...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2443 @srdo Rebased. I'm seeing intermittent test failure like below, but not consistent failure. Will try to take a look at once I have time to, but let's move it out of this PR. ``` 17:32:06.845 [SLOT_1027] INFO o.a.s.m.StormMetricRegistry - Starting metrics reporters... 17:32:06.845 [SLOT_1027] INFO o.a.s.s.a.ClientAuthUtils - Got AutoCreds [] 17:32:06.846 [SLOT_1027] INFO o.a.s.d.w.WorkerState - Reading assignments 17:32:06.846 [SLOT_1027] ERROR o.a.s.d.s.Slot - Error when processing event java.io.IOException: java.lang.NullPointerException at org.apache.storm.daemon.supervisor.LocalContainer.launch(LocalContainer.java:54) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.LocalContainerLauncher.launchContainer(LocalContainerLauncher.java:42) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.Slot.handleWaitingForBlobUpdate(Slot.java:528) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:232) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:902) [storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] Caused by: java.lang.NullPointerException at org.apache.storm.daemon.worker.WorkerState.readWorkerExecutors(WorkerState.java:630) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.worker.WorkerState.(WorkerState.java:153) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.worker.Worker.loadWorker(Worker.java:172) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.worker.Worker.lambda$start$39(Worker.java:164) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_66] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_66] at org.apache.storm.daemon.worker.Worker.start(Worker.java:163) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.LocalContainer.launch(LocalContainer.java:52) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] ... 4 more 17:32:06.847 [SLOT_1027] ERROR o.a.s.u.Utils - Halting process: Error when processing an event java.lang.RuntimeException: Halting process: Error when processing an event at org.apache.storm.utils.Utils.exitProcess(Utils.java:473) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:949) [storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] ``` ---
[GitHub] storm pull request #2762: STORM-3148: Avoid threading issues with kryo
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2762#discussion_r202196741 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java --- @@ -47,12 +47,20 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except BackPressureStatus status = (BackPressureStatus) message; if (status.bpTasks != null) { for (Integer bpTask : status.bpTasks) { -remoteBpStatus[bpTask].set(true); +try { +remoteBpStatus[bpTask].set(true); +} catch (ArrayIndexOutOfBoundsException e) { --- End diff -- Could this happen in normal situation? I guess we don't expect it. I don't think catching exception would hurt, but just to be sure about this. ---
[GitHub] storm pull request #2759: STORM-2947: Remove some deprecated methods from St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2759#discussion_r202034158 --- Diff: storm-client/src/jvm/org/apache/storm/generated/ClusterSummary.java --- @@ -29,24 +29,21 @@ private static final org.apache.storm.thrift.protocol.TStruct STRUCT_DESC = new org.apache.storm.thrift.protocol.TStruct("ClusterSummary"); private static final org.apache.storm.thrift.protocol.TField SUPERVISORS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("supervisors", org.apache.storm.thrift.protocol.TType.LIST, (short)1); - private static final org.apache.storm.thrift.protocol.TField NIMBUS_UPTIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("nimbus_uptime_secs", org.apache.storm.thrift.protocol.TType.I32, (short)2); - private static final org.apache.storm.thrift.protocol.TField TOPOLOGIES_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("topologies", org.apache.storm.thrift.protocol.TType.LIST, (short)3); - private static final org.apache.storm.thrift.protocol.TField NIMBUSES_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("nimbuses", org.apache.storm.thrift.protocol.TType.LIST, (short)4); + private static final org.apache.storm.thrift.protocol.TField TOPOLOGIES_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("topologies", org.apache.storm.thrift.protocol.TType.LIST, (short)2); + private static final org.apache.storm.thrift.protocol.TField NIMBUSES_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("nimbuses", org.apache.storm.thrift.protocol.TType.LIST, (short)3); private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ClusterSummaryStandardSchemeFactory(); private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ClusterSummaryTupleSchemeFactory(); private java.util.List supervisors; // required - private int nimbus_uptime_secs; // optional private java.util.List topologies; // required private java.util.List nimbuses; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum { SUPERVISORS((short)1, "supervisors"), -NIMBUS_UPTIME_SECS((short)2, "nimbus_uptime_secs"), --- End diff -- Nice! Thanks for addressing. ---
[GitHub] storm pull request #2759: STORM-2947: Remove some deprecated methods from St...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2759#discussion_r202033999 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java --- @@ -85,11 +84,6 @@ public static StormCommon setInstance(StormCommon common) { return oldInstance; } -@Deprecated --- End diff -- Yeah I don't have strong opinion on this, so OK to remove it. I haven't use it directly. ---