[GitHub] storm pull request #2941: STORM-3318: Complete information in Class NewKafka...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2941#discussion_r249026503 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java --- @@ -64,27 +64,51 @@ public String toString() { "topics='" + topics + '\'' + ", consumerGroupId='" + consumerGroupId + '\'' + ", bootStrapBrokers='" + bootStrapBrokers + '\'' + + ", securityProtocol='" + securityProtocol + '\'' + + ", consumerPropertiesFileName='" + consumerPropertiesFileName + '\'' + '}'; } @Override public boolean equals(Object o) { -if (this == o) return true; -if (o == null || getClass() != o.getClass()) return false; +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} NewKafkaSpoutOffsetQuery that = (NewKafkaSpoutOffsetQuery) o; -if (topics != null ? !topics.equals(that.topics) : that.topics != null) return false; -if (consumerGroupId != null ? !consumerGroupId.equals(that.consumerGroupId) : that.consumerGroupId != null) return false; -return !(bootStrapBrokers != null ? !bootStrapBrokers.equals(that.bootStrapBrokers) : that.bootStrapBrokers != null); - +if (topics != null ? !topics.equals(that.topics) : that.topics != null) { +return false; +} +if (consumerGroupId != null ? !consumerGroupId.equals(that.consumerGroupId) +: that.consumerGroupId != null) { +return false; +} +if (bootStrapBrokers != null ? !bootStrapBrokers.equals(that.bootStrapBrokers) + : that.bootStrapBrokers != null) { +return false; +} +if (securityProtocol != null ? !securityProtocol.equals(that.securityProtocol) + : that.securityProtocol != null) { +return false; +} +return consumerPropertiesFileName != null ? consumerPropertiesFileName +.equals(that.consumerPropertiesFileName) : that.consumerPropertiesFileName == null; } @Override public int hashCode() { int result = topics != null ? topics.hashCode() : 0; result = 31 * result + (consumerGroupId != null ? consumerGroupId.hashCode() : 0); result = 31 * result + (bootStrapBrokers != null ? bootStrapBrokers.hashCode() : 0); +result = 31 * result + (securityProtocol != null ? securityProtocol.hashCode() : 0); +result = +31 * result + (consumerPropertiesFileName != null ? consumerPropertiesFileName +.hashCode() + : 0); --- End diff -- The formatting here is off. ---
[GitHub] storm pull request #2940: STORM-3318: Complete information in Class NewKafka...
Github user MichealShin closed the pull request at: https://github.com/apache/storm/pull/2940 ---
[GitHub] storm pull request #2941: STORM-3318: Complete information in Class NewKafka...
GitHub user MichealShin opened a pull request: https://github.com/apache/storm/pull/2941 STORM-3318: Complete information in Class NewKafkaSpoutOffsetQuery Just complete information in three methods(toString , equals, hashCode). You can merge this pull request into a Git repository by running: $ git pull https://github.com/MichealShin/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2941.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2941 commit 3eba918afdc8dbc777e790d71be9dc30d17f125d Author: MichealShin Date: 2019-01-18T12:21:49Z STORM-3318: Complete information in Class NewKafkaSpoutOffsetQuery Just complete information in three methods(toString , equals, hashCode). ---
[GitHub] storm pull request #2940: STORM-3318: Complete information in Class NewKafka...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2940#discussion_r248985594 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java --- @@ -75,8 +77,13 @@ public boolean equals(Object o) { NewKafkaSpoutOffsetQuery that = (NewKafkaSpoutOffsetQuery) o; if (topics != null ? !topics.equals(that.topics) : that.topics != null) return false; -if (consumerGroupId != null ? !consumerGroupId.equals(that.consumerGroupId) : that.consumerGroupId != null) return false; -return !(bootStrapBrokers != null ? !bootStrapBrokers.equals(that.bootStrapBrokers) : that.bootStrapBrokers != null); +if (consumerGroupId != null ? !consumerGroupId.equals(that.consumerGroupId) : that.consumerGroupId != null) --- End diff -- I realize this was an issue before as well, but please add braces. ---
[GitHub] storm pull request #2940: STORM-3318: Complete information in Class NewKafka...
GitHub user MichealShin opened a pull request: https://github.com/apache/storm/pull/2940 STORM-3318: Complete information in Class NewKafkaSpoutOffsetQuery Just complete information in three methods(toString , equals, hashCode). You can merge this pull request into a Git repository by running: $ git pull https://github.com/MichealShin/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2940.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2940 commit a4fde2c43cb529197572c9524d0e9800c61bac34 Author: MichealShin Date: 2018-01-30T09:42:38Z STORM-2917: Check the derecated config nimbus.host There is a situation: the deployer wants to use the new nimbus config(nimbus.seeds), but still leave the blank deprecated config(nimbus.host) in storm.yaml. It will not work. To avoid this, the program should at least check whether the deprecated config is blank. commit 59952ef37293196b160d5854ce834c5c5bfc202a Author: MichealShin Date: 2018-01-30T09:44:29Z Merge pull request #2 from MichealShin/MichealShin-patch-2 STORM-2917: Check the derecated config nimbus.host commit f5eddafef4fde0de27c2a4033f382d89f17821c5 Author: shichenglong Date: 2019-01-18T02:20:07Z Merge branch 'master' of git://github.com/apache/storm into apache-master # Conflicts: # storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java commit cd5d64b6bb8eb6f34f65044577689871e035877e Author: MichealShin Date: 2019-01-18T02:35:34Z Merge pull request #3 from apache/master update commit afe1b071059c6b3d91ba77187f3ca0d50d85e37e Author: MichealShin Date: 2019-01-18T02:55:36Z STORM-3318: Complete information in Class NewKafkaSpoutOffsetQuery Just complete information in three methods(toString , equals, hashCode). ---
[GitHub] storm pull request #2939: STORM-3315: Upgrade to Kryo 4
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2939 STORM-3315: Upgrade to Kryo 4 https://issues.apache.org/jira/browse/STORM-3315 Tested compatibility with Kryo 3.0.3 serialization by running the TVL topology for a couple of minutes with two supervisors, one using the unmodified Storm code, and one running this patch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3315 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2939.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2939 commit 28b22cd183caa32b51bc3a89260a37427f6a6619 Author: Stig Rohde Døssing Date: 2019-01-17T18:42:49Z STORM-3315: Upgrade to Kryo 4 ---
[GitHub] storm pull request #2938: STORM-3317 fix upload credentials when using a dif...
GitHub user agresch opened a pull request: https://github.com/apache/storm/pull/2938 STORM-3317 fix upload credentials when using a differing path for jav⦠â¦a.security.auth.login.config If the launcher has a java.security.auth.login.config file locally specified that differs from the system property in the topology conf, we need to honor that setting for upload credentials to work properly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/agresch/storm agresch_uploadcreds Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2938.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2938 commit 75f9f5c25dfece734c4e9bed1df2a4fbe4842c79 Author: Aaron Gresch Date: 2019-01-17T20:22:23Z STORM-3317 fix upload credentials when using a differing path for java.security.auth.login.config ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r247197760 --- Diff: bin/storm.py --- @@ -296,787 +239,1044 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[] elif is_windows(): # handling whitespaces in JAVA_CMD try: -ret = sub.check_output(all_args, stderr=sub.STDOUT) +ret = subprocess.check_output(all_args, stderr=subprocess.STDOUT) print(ret) -except sub.CalledProcessError as e: +except subprocess.CalledProcessError as e: print(e.output) sys.exit(e.returncode) else: os.execvp(JAVA_CMD, all_args) return exit_code -def run_client_jar(jarfile, klass, args, daemon=False, client=True, extrajvmopts=[]): -global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD -local_jars = DEP_JARS_OPTS -artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD) +def run_client_jar(klass, args, daemon=False, client=True, extrajvmopts=[]): +local_jars = args.jars.split(",") +jarfile = args.topology_jar_path + +artifact_to_file_jars = resolve_dependencies( +args.artifacts, args.artifactRepositories, +args.mavenLocalRepositoryDirectory, args.proxyUrl, +args.proxyUsername, args.proxyPassword +) -extra_jars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR] +extra_jars = [jarfile, USER_CONF_DIR, STORM_BIN_DIR] extra_jars.extend(local_jars) extra_jars.extend(artifact_to_file_jars.values()) exec_storm_class( -klass, +klass, args.c, jvmtype="-client", extrajars=extra_jars, -args=args, +args=args.topology_main_args, daemon=False, jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] + ["-Dstorm.dependency.jars=" + ",".join(local_jars)] + ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)]) -def local(jarfile, klass, *args): -"""Syntax: [storm local topology-jar-path class ...] -Runs the main method of class with the specified arguments but pointing to a local cluster -The storm jars and configs in ~/.storm are put on the classpath. -The process is configured so that StormSubmitter - (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html) -and others will interact with a local cluster instead of the one configured by default. +def print_localconfvalue(args): +print(args.conf_name + ": " + confvalue(args.conf_name, args.c, [USER_CONF_DIR])) + + +def print_remoteconfvalue(args): +print(args.conf_name + ": " + confvalue(args.conf_name, args.c, [CLUSTER_CONF_DIR])) + + +def initialize_main_command(): +main_parser = argparse.ArgumentParser(prog="storm") +main_parser.add_argument("--config", default=None, help="Override default storm conf") +main_parser.add_argument("-c", action="append", default=[], help="Override storm conf properties") + +subparsers = main_parser.add_subparsers(help="") + +initialize_jar_subcommand(subparsers) +initialize_localconfvalue_subcommand(subparsers) +initialize_remoteconfvalue_subcommand(subparsers) +initialize_local_subcommand(subparsers) +initialize_sql_subcommand(subparsers) +initialize_kill_subcommand(subparsers) +initialize_upload_credentials_subcommand(subparsers) +initialize_blobstore_subcommand(subparsers) +initialize_heartbeats_subcommand(subparsers) +initialize_activate_subcommand(subparsers) +initialize_listtopos_subcommand(subparsers) +initialize_deactivate_subcommand(subparsers) +initialize_rebalance_subcommand(subparsers) +initialize_get_errors_subcommand(subparsers) +initialize_healthcheck_subcommand(subparsers) +initialize_kill_workers_subcommand(subparsers) +initialize_admin_subcommand(subparsers) +initialize_shell_subcommand(subparsers) +initialize_repl_subcommand(subparsers) +initialize_nimbus_subcommand(subparsers) +initialize_pacemaker_subcommand(subparsers) +initialize_supervisor_subcommand(subparsers) +initialize_ui_subcommand(subparsers) +initialize_logviewer_subcommand(subparsers) +initialize_drpc_client_subcommand(subparsers) +
[GitHub] storm pull request #2937: MINOR: Remove unused parameter from storm-kafka-cl...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2937 MINOR: Remove unused parameter from storm-kafka-client-example docs The kafka_artifact_id parameter is not used anymore. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm fix-kafka-example-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2937.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2937 commit 111aa8d599d9216423f8ce5a79362460a5f6aa36 Author: Stig Rohde Døssing Date: 2019-01-11T10:58:05Z MINOR: Remove unused parameter from storm-kafka-client-example docs ---
[GitHub] storm pull request #2936: STORM-3312: Upgrade Guava to latest version where ...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2936 STORM-3312: Upgrade Guava to latest version where possible https://issues.apache.org/jira/browse/STORM-3312 You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3312 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2936.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2936 commit f43bcc7addc7c75db3f6cda803b70cd111038700 Author: Stig Rohde Døssing Date: 2019-01-09T21:07:37Z STORM-3312: Upgrade Guava to latest version where possible ---
[GitHub] storm pull request #2935: Doc clarification on returning `null` from RecordT...
GitHub user esamson opened a pull request: https://github.com/apache/storm/pull/2935 Doc clarification on returning `null` from RecordTranslator.apply() According to KafkaSpout implementation, record is discarded if Builder.setEmitNullTuples is set to `false`, instead of `true`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/esamson/storm master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2935.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2935 commit b1a8a062332b301ca05be740699862f23d9b540a Author: Edward Samson Date: 2019-01-09T02:19:39Z Doc clarification on returning `null` from RecordTranslator.apply() According to KafkaSpout implementation, record is discarded if Builder.setEmitNullTuples is set to `false`, instead of `true`. ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245727413 --- Diff: bin/storm.py --- @@ -296,787 +239,1044 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[] elif is_windows(): # handling whitespaces in JAVA_CMD try: -ret = sub.check_output(all_args, stderr=sub.STDOUT) +ret = subprocess.check_output(all_args, stderr=subprocess.STDOUT) print(ret) -except sub.CalledProcessError as e: +except subprocess.CalledProcessError as e: print(e.output) sys.exit(e.returncode) else: os.execvp(JAVA_CMD, all_args) return exit_code -def run_client_jar(jarfile, klass, args, daemon=False, client=True, extrajvmopts=[]): -global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD -local_jars = DEP_JARS_OPTS -artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD) +def run_client_jar(klass, args, daemon=False, client=True, extrajvmopts=[]): +local_jars = args.jars.split(",") +jarfile = args.topology_jar_path + +artifact_to_file_jars = resolve_dependencies( +args.artifacts, args.artifactRepositories, +args.mavenLocalRepositoryDirectory, args.proxyUrl, +args.proxyUsername, args.proxyPassword +) -extra_jars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR] +extra_jars = [jarfile, USER_CONF_DIR, STORM_BIN_DIR] extra_jars.extend(local_jars) extra_jars.extend(artifact_to_file_jars.values()) exec_storm_class( -klass, +klass, args.c, --- End diff -- Can we rename `c` to something like `storm_config_opts`? If it's named that way by argparse, then don't worry about it. ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245744213 --- Diff: bin/storm.py --- @@ -296,787 +239,1044 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[] elif is_windows(): # handling whitespaces in JAVA_CMD try: -ret = sub.check_output(all_args, stderr=sub.STDOUT) +ret = subprocess.check_output(all_args, stderr=subprocess.STDOUT) print(ret) -except sub.CalledProcessError as e: +except subprocess.CalledProcessError as e: print(e.output) sys.exit(e.returncode) else: os.execvp(JAVA_CMD, all_args) return exit_code -def run_client_jar(jarfile, klass, args, daemon=False, client=True, extrajvmopts=[]): -global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD -local_jars = DEP_JARS_OPTS -artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD) +def run_client_jar(klass, args, daemon=False, client=True, extrajvmopts=[]): +local_jars = args.jars.split(",") +jarfile = args.topology_jar_path + +artifact_to_file_jars = resolve_dependencies( +args.artifacts, args.artifactRepositories, +args.mavenLocalRepositoryDirectory, args.proxyUrl, +args.proxyUsername, args.proxyPassword +) -extra_jars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR] +extra_jars = [jarfile, USER_CONF_DIR, STORM_BIN_DIR] extra_jars.extend(local_jars) extra_jars.extend(artifact_to_file_jars.values()) exec_storm_class( -klass, +klass, args.c, jvmtype="-client", extrajars=extra_jars, -args=args, +args=args.topology_main_args, daemon=False, jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] + ["-Dstorm.dependency.jars=" + ",".join(local_jars)] + ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)]) -def local(jarfile, klass, *args): -"""Syntax: [storm local topology-jar-path class ...] -Runs the main method of class with the specified arguments but pointing to a local cluster -The storm jars and configs in ~/.storm are put on the classpath. -The process is configured so that StormSubmitter - (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html) -and others will interact with a local cluster instead of the one configured by default. +def print_localconfvalue(args): +print(args.conf_name + ": " + confvalue(args.conf_name, args.c, [USER_CONF_DIR])) + + +def print_remoteconfvalue(args): +print(args.conf_name + ": " + confvalue(args.conf_name, args.c, [CLUSTER_CONF_DIR])) + + +def initialize_main_command(): +main_parser = argparse.ArgumentParser(prog="storm") +main_parser.add_argument("--config", default=None, help="Override default storm conf") --- End diff -- Nit: It isn't obvious from the description what the difference between `-c` and `--config` is, or how to use them. Can we update the descriptions? For `--config` something like "Override default storm conf file", and for `-c` something like "Override storm conf properties, e.g. key=val,key2=val2". ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245726485 --- Diff: storm-client/pom.xml --- @@ -240,6 +240,29 @@ + +org.codehaus.mojo +exec-maven-plugin + + + +python2.7 --- End diff -- @srdo It's a little messier to do that and better to leave it up to the environment to do it's own test setup. --user is for then there's no virtualenv (which will vary from machine to machine) - which is a good example of how the host itself should do the setup. ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245725371 --- Diff: storm-client/pom.xml --- @@ -240,6 +240,29 @@ + +org.codehaus.mojo +exec-maven-plugin + + + +python2.7 --- End diff -- Thanks, it looks good. I'm wondering if we can get away with also doing `pip install mock --user` as part of the Maven run, or is that a bad idea? ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245722204 --- Diff: bin/storm.py --- @@ -296,787 +239,1044 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[] elif is_windows(): # handling whitespaces in JAVA_CMD try: -ret = sub.check_output(all_args, stderr=sub.STDOUT) +ret = subprocess.check_output(all_args, stderr=subprocess.STDOUT) print(ret) -except sub.CalledProcessError as e: +except subprocess.CalledProcessError as e: print(e.output) sys.exit(e.returncode) else: os.execvp(JAVA_CMD, all_args) return exit_code -def run_client_jar(jarfile, klass, args, daemon=False, client=True, extrajvmopts=[]): -global DEP_JARS_OPTS, DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD -local_jars = DEP_JARS_OPTS -artifact_to_file_jars = resolve_dependencies(DEP_ARTIFACTS_OPTS, DEP_ARTIFACTS_REPOSITORIES_OPTS, DEP_MAVEN_LOCAL_REPOSITORY_DIRECTORY, DEP_PROXY_URL, DEP_PROXY_USERNAME, DEP_PROXY_PASSWORD) +def run_client_jar(klass, args, daemon=False, client=True, extrajvmopts=[]): +local_jars = args.jars.split(",") +jarfile = args.topology_jar_path + +artifact_to_file_jars = resolve_dependencies( +args.artifacts, args.artifactRepositories, +args.mavenLocalRepositoryDirectory, args.proxyUrl, +args.proxyUsername, args.proxyPassword +) -extra_jars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR] +extra_jars = [jarfile, USER_CONF_DIR, STORM_BIN_DIR] extra_jars.extend(local_jars) extra_jars.extend(artifact_to_file_jars.values()) exec_storm_class( -klass, +klass, args.c, jvmtype="-client", extrajars=extra_jars, -args=args, +args=args.topology_main_args, daemon=False, jvmopts=JAR_JVM_OPTS + extrajvmopts + ["-Dstorm.jar=" + jarfile] + ["-Dstorm.dependency.jars=" + ",".join(local_jars)] + ["-Dstorm.dependency.artifacts=" + json.dumps(artifact_to_file_jars)]) -def local(jarfile, klass, *args): -"""Syntax: [storm local topology-jar-path class ...] -Runs the main method of class with the specified arguments but pointing to a local cluster -The storm jars and configs in ~/.storm are put on the classpath. -The process is configured so that StormSubmitter - (http://storm.apache.org/releases/current/javadocs/org/apache/storm/StormSubmitter.html) -and others will interact with a local cluster instead of the one configured by default. +def print_localconfvalue(args): +print(args.conf_name + ": " + confvalue(args.conf_name, args.c, [USER_CONF_DIR])) + + +def print_remoteconfvalue(args): +print(args.conf_name + ": " + confvalue(args.conf_name, args.c, [CLUSTER_CONF_DIR])) + + +def initialize_main_command(): +main_parser = argparse.ArgumentParser(prog="storm") +main_parser.add_argument("--config", default=None, help="Override default storm conf") +main_parser.add_argument("-c", action="append", default=[], help="Override storm conf properties") + +subparsers = main_parser.add_subparsers(help="") + +initialize_jar_subcommand(subparsers) +initialize_localconfvalue_subcommand(subparsers) +initialize_remoteconfvalue_subcommand(subparsers) +initialize_local_subcommand(subparsers) +initialize_sql_subcommand(subparsers) +initialize_kill_subcommand(subparsers) +initialize_upload_credentials_subcommand(subparsers) +initialize_blobstore_subcommand(subparsers) +initialize_heartbeats_subcommand(subparsers) +initialize_activate_subcommand(subparsers) +initialize_listtopos_subcommand(subparsers) +initialize_deactivate_subcommand(subparsers) +initialize_rebalance_subcommand(subparsers) +initialize_get_errors_subcommand(subparsers) +initialize_healthcheck_subcommand(subparsers) +initialize_kill_workers_subcommand(subparsers) +initialize_admin_subcommand(subparsers) +initialize_shell_subcommand(subparsers) +initialize_repl_subcommand(subparsers) +initialize_nimbus_subcommand(subparsers) +initialize_pacemaker_subcommand(subparsers) +initialize_supervisor_subcommand(subparsers) +initialize_ui_subcommand(subparsers) +initialize_logviewer_subcommand(subparsers) +initialize_drpc_client_subcommand(subparsers) +
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245714702 --- Diff: bin/storm.py --- @@ -132,13 +56,8 @@ def get_jars_full(adir): elif os.path.exists(adir): files = [adir] -ret = [] -for f in files: -if f.endswith(".jar"): -ret.append(os.path.join(adir, f)) -return ret +return [os.path.join(adir, f) for f in files if f.endswith(".jar")] -# If given path is a dir, make it a wildcard so the JVM will include all JARs in the directory. --- End diff -- Nit: Is this comment no longer relevant? ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245715647 --- Diff: bin/storm.py --- @@ -156,49 +95,92 @@ def get_classpath(extrajars, daemon=True, client=False): ret.extend(get_wildcard_dir(os.path.join(STORM_DIR, "extlib"))) if daemon: ret.extend(get_wildcard_dir(os.path.join(STORM_DIR, "extlib-daemon"))) -if STORM_EXT_CLASSPATH != None: +if STORM_EXT_CLASSPATH: ret.append(STORM_EXT_CLASSPATH) -if daemon and STORM_EXT_CLASSPATH_DAEMON != None: +if daemon and STORM_EXT_CLASSPATH_DAEMON: ret.append(STORM_EXT_CLASSPATH_DAEMON) ret.extend(extrajars) -return normclasspath(os.pathsep.join(ret)) +return NORMAL_CLASS_PATH(os.pathsep.join(ret)) -def confvalue(name, extrapaths, daemon=True): -global CONFFILE -command = [ -JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, -"-cp", get_classpath(extrapaths, daemon), "org.apache.storm.command.ConfigValue", name -] -p = sub.Popen(command, stdout=sub.PIPE) -output, errors = p.communicate() -# python 3 -if not isinstance(output, str): --- End diff -- Is this check not necessary for Python 3 anymore? ---
[GitHub] storm pull request #2934: STORM-3310: Make JCQueueTest wait for consumer to ...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2934 STORM-3310: Make JCQueueTest wait for consumer to read all queued ite⦠â¦ms before terminating https://issues.apache.org/jira/browse/STORM-3310 I've only seen the test fail once, but it should be pretty easy to see that the test has a race. The tests all expect some messages to go through the queue, but the `run` method doesn't make sure the producers have actually produced anything, and even if they do, the consumer may be shut down before it can read the messages the producers put in the queue. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3310 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2934.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2934 commit a84c879d3429afef946ab8e9c0d0cddf497e1c6d Author: Stig Rohde Døssing Date: 2019-01-07T16:11:23Z STORM-3310: Make JCQueueTest wait for consumer to read all queued items before terminating ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245709584 --- Diff: storm-client/pom.xml --- @@ -240,6 +240,29 @@ + +org.codehaus.mojo +exec-maven-plugin + + + +python2.7 --- End diff -- @srdo I've added a profile that would execute only on Unix - I didn't add one for Windows because that would change the file path separators that we're asserting on. I can change the test to make that configureable if you want ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245602260 --- Diff: storm-client/pom.xml --- @@ -240,6 +240,29 @@ + +org.codehaus.mojo +exec-maven-plugin + + + +python2.7 --- End diff -- Or even better, set the executable name to python2.7 on Linux, and python on Windows. ---
[GitHub] storm pull request #2933: STORM-3309: Fix flaky tick tuple test
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2933 STORM-3309: Fix flaky tick tuple test https://issues.apache.org/jira/browse/STORM-3309 I've made the following changes: * When message timeout is disabled, the acker shouldn't time out tuples. Disable ticks for the acker if message timeouts are disabled * The spout and bolt executors don't integrate with time simulation, in the sense that they don't require simulated time to increment in order to run. This is fine, but if they aren't going to pause for simulated time to increase, they also shouldn't potentially pause during initialization, waiting for Nimbus to activate the topology. * InProcMessaging (used by the FeederSpout) will wait for the receiver to show up when sending the initial message. It waits at most 20 seconds, but if time simulation is enabled, it only waits 2. This is not enough for the topology/spout to start most of the time. I set the simulated time increment to match the real time spent waiting. * The Zookeeper log drowns out any useful logging, set its level to WARN in storm-server The TickTupleTest has been amended a bit. The problem with the current code is that LocalCluster.waitForIdle doesn't cover spout and bolt executor async loops, so we can end up in a situations where the test fails spuriously. Example: The test starts by incrementing cluster time until the bolt receives a tick tuple. Starting from t=0, it is possible that the test sets cluster time to 10 and waits until the tick thread has added some tuples. The bolt thread runs independently of time simulation, and will consume the first tick at some arbitrary time. If we are unlucky, we can get the following sequence: * 10 ticks are added by tick thread * Bolt consumes first tick * All threads covered by LocalCluster.waitForIdle (but not the bolt thread) are now idle, so the test exits the loop waiting for ticks * The received ticks list is cleared * The test stores what time the list was cleared at, advances cluster time by 1 and checks that a tick is received * The bolt may just now be processing some of the previously queued ticks. This will cause the test to fail, because the bolt may receive multiple ticks at the same simulated time. The replacement test instead uses a bootstrap tuple to verify that the executor (and tick thread) have started, and then increments the full tick interval. The tick interval is chosen so the tick thread will not produce any ticks until the test advances time enough to trigger one. This allows the test to verify that exactly one tick is received per second. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3309 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2933.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2933 commit 6ca657d13d7f0ec50be2baed7fd8c70df5c9deca Author: Stig Rohde Døssing Date: 2019-01-05T13:38:04Z STORM-3309: Fix flaky tick tuple test ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2930#discussion_r245480567 --- Diff: storm-client/pom.xml --- @@ -240,6 +240,29 @@ + +org.codehaus.mojo +exec-maven-plugin + + + +python2.7 --- End diff -- I'd like the storm-client build to still work on Windows. One option would be to only run this execution when the OS is UNIX-y, e.g. ``` unix ``` ---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2927 ---
[GitHub] storm pull request #2924: STORM-1289: Port integration-test.clj to Java
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2924 ---
[GitHub] storm pull request #2929: MINOR: Correct comment about overflow limiting in ...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2929 ---
[GitHub] storm pull request #2931: STORM-3307: Fixes error time on UI for time of las...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2931 ---
[GitHub] storm pull request #2932: STORM-3274: Adds mock to Travis pipeline so it can...
Github user govind-menon closed the pull request at: https://github.com/apache/storm/pull/2932 ---
[GitHub] storm pull request #2932: STORM-3274: Adds mock to Travis pipeline so it can...
GitHub user govind-menon opened a pull request: https://github.com/apache/storm/pull/2932 STORM-3274: Adds mock to Travis pipeline so it can run Python tests f⦠â¦or cli migration You can merge this pull request into a Git repository by running: $ git pull https://github.com/govind-menon/storm STORM-3274-II Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2932.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2932 commit 53bba3498a94d72d9388cf68c6bdfa7dbb264df0 Author: Govind Menon Date: 2019-01-03T20:16:05Z STORM-3274: Adds mock to Travis pipeline so it can run Python tests for cli migration ---
[GitHub] storm pull request #2931: STORM-3307: Fixes error time on UI for time of las...
GitHub user govind-menon opened a pull request: https://github.com/apache/storm/pull/2931 STORM-3307: Fixes error time on UI for time of last error on componen⦠â¦t page You can merge this pull request into a Git repository by running: $ git pull https://github.com/govind-menon/storm STORM-3307 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2931 commit ff5b645134cfab98b2421b187e5afe0132b85e76 Author: Govind Menon Date: 2019-01-03T16:50:28Z STORM-3307: Fixes error time on UI for time of last error on component page ---
[GitHub] storm pull request #2930: STORM-3274: Migrates storm CLI to using argparse m...
GitHub user govind-menon opened a pull request: https://github.com/apache/storm/pull/2930 STORM-3274: Migrates storm CLI to using argparse making documentation⦠⦠more accessible You can merge this pull request into a Git repository by running: $ git pull https://github.com/govind-menon/storm STORM-3274 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2930 commit a5584814670beba3f2ce927f99be33f0d37edbe5 Author: Govind Menon Date: 2018-12-26T17:18:33Z STORM-3274: Migrates storm CLI to using argparse making documentation more accessible ---
[GitHub] storm pull request #2908: STORM-3276: Updated Flux to deal with storm local ...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2908 ---
[GitHub] storm pull request #2929: MINOR: Correct comment about overflow limiting in ...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2929 MINOR: Correct comment about overflow limiting in JCQueue. Setting ov⦠â¦erflow limit to 0 disables limiting, does not disable overflow You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm MINOR-jcqueue-overflow-limit-comment Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2929.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2929 commit 7d14d02ed309fd724b3ecccf2a75a65b0359eee6 Author: Stig Rohde Døssing Date: 2018-12-21T17:51:17Z MINOR: Correct comment about overflow limiting in JCQueue. Setting overflow limit to 0 disables limiting, does not disable overflow ---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2927#discussion_r242739488 --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java --- @@ -0,0 +1,927 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.integration; + +import static org.apache.storm.integration.AssertLoop.assertAcked; +import static org.apache.storm.integration.AssertLoop.assertFailed; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.Thrift; +import org.apache.storm.Thrift.BoltDetails; +import org.apache.storm.Thrift.SpoutDetails; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.generated.TopologyInitialStatus; +import org.apache.storm.hooks.BaseTaskHook; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.AckFailMapTracker; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.TestAggregatesCounter; +import org.apache.storm.testing.TestConfBolt; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestPlannerSpout; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TrackedTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@IntegrationTest +public class TopologyIntegrationTest { + +@ParameterizedTest +@ValueSource(strings = {"true", "false"}) +public void testBasicTopology(boolean useLocalMessaging) throws Exception { +try (LocalCluster cluster = new LocalCluster.Builder() +.withSimulatedTime() +.withSupervisors(4) + .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging)) +.build()) { +Map spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3)); +Map boltMap = new HashMap<>(); +boltMap.put("2", +Thrift.prepareBoltDetails( +Collections.singletonMap( +Utils.getGlobalStreamId("1", null), + Thrift.prepareFieldsGrouping(Collections.singletonList("word"))), +new TestWordCounter(), 4)); +
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2927#discussion_r242699069 --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java --- @@ -0,0 +1,927 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.integration; + +import static org.apache.storm.integration.AssertLoop.assertAcked; +import static org.apache.storm.integration.AssertLoop.assertFailed; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.Thrift; +import org.apache.storm.Thrift.BoltDetails; +import org.apache.storm.Thrift.SpoutDetails; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.generated.TopologyInitialStatus; +import org.apache.storm.hooks.BaseTaskHook; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.AckFailMapTracker; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.TestAggregatesCounter; +import org.apache.storm.testing.TestConfBolt; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestPlannerSpout; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TrackedTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@IntegrationTest +public class TopologyIntegrationTest { + +@ParameterizedTest +@ValueSource(strings = {"true", "false"}) +public void testBasicTopology(boolean useLocalMessaging) throws Exception { +try (LocalCluster cluster = new LocalCluster.Builder() +.withSimulatedTime() +.withSupervisors(4) + .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging)) +.build()) { +Map spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3)); +Map boltMap = new HashMap<>(); +boltMap.put("2", +Thrift.prepareBoltDetails( +Collections.singletonMap( +Utils.getGlobalStreamId("1", null), + Thrift.prepareFieldsGrouping(Collections.singletonList("word"))), +new TestWordCounter(), 4)); +
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2927#discussion_r242698501 --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java --- @@ -0,0 +1,927 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.integration; + +import static org.apache.storm.integration.AssertLoop.assertAcked; +import static org.apache.storm.integration.AssertLoop.assertFailed; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.Thrift; +import org.apache.storm.Thrift.BoltDetails; +import org.apache.storm.Thrift.SpoutDetails; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.generated.TopologyInitialStatus; +import org.apache.storm.hooks.BaseTaskHook; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.AckFailMapTracker; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.TestAggregatesCounter; +import org.apache.storm.testing.TestConfBolt; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestPlannerSpout; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TrackedTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@IntegrationTest +public class TopologyIntegrationTest { + +@ParameterizedTest +@ValueSource(strings = {"true", "false"}) +public void testBasicTopology(boolean useLocalMessaging) throws Exception { +try (LocalCluster cluster = new LocalCluster.Builder() +.withSimulatedTime() +.withSupervisors(4) + .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging)) +.build()) { +Map spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3)); +Map boltMap = new HashMap<>(); +boltMap.put("2", +Thrift.prepareBoltDetails( +Collections.singletonMap( +Utils.getGlobalStreamId("1", null), + Thrift.prepareFieldsGrouping(Collections.singletonList("word"))), +new TestWordCounter(), 4)); +
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2927#discussion_r242663110 --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java --- @@ -0,0 +1,927 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.integration; + +import static org.apache.storm.integration.AssertLoop.assertAcked; +import static org.apache.storm.integration.AssertLoop.assertFailed; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.Thrift; +import org.apache.storm.Thrift.BoltDetails; +import org.apache.storm.Thrift.SpoutDetails; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.generated.TopologyInitialStatus; +import org.apache.storm.hooks.BaseTaskHook; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.AckFailMapTracker; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.TestAggregatesCounter; +import org.apache.storm.testing.TestConfBolt; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestPlannerSpout; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TrackedTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@IntegrationTest +public class TopologyIntegrationTest { + +@ParameterizedTest +@ValueSource(strings = {"true", "false"}) +public void testBasicTopology(boolean useLocalMessaging) throws Exception { +try (LocalCluster cluster = new LocalCluster.Builder() +.withSimulatedTime() +.withSupervisors(4) + .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging)) +.build()) { +Map spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3)); +Map boltMap = new HashMap<>(); +boltMap.put("2", +Thrift.prepareBoltDetails( +Collections.singletonMap( +Utils.getGlobalStreamId("1", null), + Thrift.prepareFieldsGrouping(Collections.singletonList("word"))), +new TestWordCounter(), 4)); +
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2927#discussion_r242661816 --- Diff: storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java --- @@ -0,0 +1,927 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.integration; + +import static org.apache.storm.integration.AssertLoop.assertAcked; +import static org.apache.storm.integration.AssertLoop.assertFailed; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.Testing; +import org.apache.storm.Thrift; +import org.apache.storm.Thrift.BoltDetails; +import org.apache.storm.Thrift.SpoutDetails; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SubmitOptions; +import org.apache.storm.generated.TopologyInitialStatus; +import org.apache.storm.hooks.BaseTaskHook; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.AckFailMapTracker; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.testing.FeederSpout; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.TestAggregatesCounter; +import org.apache.storm.testing.TestConfBolt; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.TestPlannerSpout; +import org.apache.storm.testing.TestWordCounter; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TrackedTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +@IntegrationTest +public class TopologyIntegrationTest { + +@ParameterizedTest +@ValueSource(strings = {"true", "false"}) +public void testBasicTopology(boolean useLocalMessaging) throws Exception { +try (LocalCluster cluster = new LocalCluster.Builder() +.withSimulatedTime() +.withSupervisors(4) + .withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging)) +.build()) { +Map spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3)); +Map boltMap = new HashMap<>(); +boltMap.put("2", +Thrift.prepareBoltDetails( +Collections.singletonMap( +Utils.getGlobalStreamId("1", null), + Thrift.prepareFieldsGrouping(Collections.singletonList("word"))), +new TestWordCounter(), 4)); +
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2918 ---
[GitHub] storm pull request #2926: STORM-3303 adjust some logging priorities, log top...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2926 ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2925 ---
[GitHub] storm pull request #2928: STORM-3270: Build Storm with Java 11, excluding so...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2928 ---
[GitHub] storm pull request #2921: STORM-3300: Fix NPE in Acker that could occur if s...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2921 ---
[GitHub] storm pull request #2928: STORM-3270: Build Storm with Java 11, excluding so...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2928 STORM-3270: Build Storm with Java 11, excluding some incompatible mod⦠â¦ules https://issues.apache.org/jira/browse/STORM-3270 All Hadoop-related modules are excluded from tests, since Hadoop doesn't work with Java 11 yet. Same for Cassandra. Libraries I'm aware of that need to be updated, other than Netty which is bumped in this PR: * Clojure to 1.10 (not released yet, currently going through RCs) * Hadoop/Hive/HBase need to be updated once https://issues.apache.org/jira/browse/HADOOP-15338 is resolved and released. * Cassandra needs to be updated to 4.0.0. This update should be easy, since we only depend on it in tests, the production code only relies on the Datastax driver, which should be compatible with most Cassandra versions. * Dropwizard Metrics should probably be updated to 4.x, but we're blocked by https://issues.apache.org/jira/browse/CASSANDRA-14667. I'm not sure how critical the fixes for Metrics are. * Kafka should probably be upgraded to 2.1.0. I don't think it is an issue for storm-kafka-client, since users can just specify another client library version, but for storm-kafka-monitor it might be necessary for us to upgrade. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3270 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2928.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2928 commit ec634bd65087d1b67a53c99ef3d5d3efeaea0afe Author: Stig Rohde Døssing Date: 2018-09-25T16:06:43Z STORM-3270: Build Storm with Java 11, excluding some incompatible modules ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2907 ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r241782960 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collect seek(currBatchTp, lastBatchMeta); -final ConsumerRecords records = consumer.poll(pollTimeoutMs); -LOG.debug("Polled [{}] records from Kafka.", records.count()); +final List> records = consumer.poll(pollTimeoutMs).records(currBatchTp); +LOG.debug("Polled [{}] records from Kafka.", records.size()); if (!records.isEmpty()) { for (ConsumerRecord record : records) { emitTuple(collector, record); } -// build new metadata -currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), this.topologyContext.getStormId()); +// build new metadata based on emitted records +currentBatch = new KafkaTridentSpoutBatchMetadata( +records.get(0).offset(), +records.get(records.size() - 1).offset(), +topologyContext.getStormId()); +} else { +//Build new metadata based on the consumer position. +//We want the next emit to start at the current consumer position, +//so make a meta that indicates that position - 1 is the last emitted offset +//This helps us avoid cases like STORM-3279, and simplifies the seek logic. +long lastEmittedOffset = consumer.position(currBatchTp) - 1; --- End diff -- okay sounds good. ---
[GitHub] storm pull request #2927: STORM-1307: Port testing4j_test.clj to Java
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2927 STORM-1307: Port testing4j_test.clj to Java Follow up to https://github.com/apache/storm/pull/2924, please review that one first. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-1307 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2927.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2927 commit a491d3e1b6a85726648d609a79c074a0ae66932c Author: Stig Rohde Døssing Date: 2018-12-07T21:07:36Z STORM-1289: Port integration-test.clj to Java commit e408283fd01d6b1146859265336f794203054901 Author: Stig Rohde Døssing Date: 2018-12-12T00:36:09Z STORM-1307: Port testing4j_test.clj to Java ---
[GitHub] storm pull request #2920: STORM-3297 prevent supervisor restart when no nimb...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2920 ---
[GitHub] storm pull request #2926: STORM-3303 adjust some logging priorities, log top...
GitHub user agresch opened a pull request: https://github.com/apache/storm/pull/2926 STORM-3303 adjust some logging priorities, log topology info You can merge this pull request into a Git repository by running: $ git pull https://github.com/agresch/storm agresch_logging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2926 commit abca1540be9e6875f93dc0b9a276710717bc0ff6 Author: Aaron Gresch Date: 2018-12-11T20:18:23Z STORM-3303 adjust some logging priorities, log topology info ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240759867 --- Diff: storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java --- @@ -154,11 +154,23 @@ private boolean uploadDependencyToBlobStore(String key, File dependency) acls.add(new AccessControl(AccessControlType.OTHER, BlobStoreAclHandler.READ)); -AtomicOutputStream blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); -Files.copy(dependency.toPath(), blob); -blob.close(); +AtomicOutputStream blob = null; +try { +blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); +Files.copy(dependency.toPath(), blob); +blob.close(); +blob = null; -uploadNew = true; +uploadNew = true; +} finally { +try { +if (blob != null) { +blob.cancel(); +} +} catch (IOException throwaway) { +// Ignore. --- End diff -- Ignore that. This exception will appear only while attempting to cancel the blob connection. We should ignore it. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240759159 --- Diff: storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java --- @@ -154,11 +154,23 @@ private boolean uploadDependencyToBlobStore(String key, File dependency) acls.add(new AccessControl(AccessControlType.OTHER, BlobStoreAclHandler.READ)); -AtomicOutputStream blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); -Files.copy(dependency.toPath(), blob); -blob.close(); +AtomicOutputStream blob = null; +try { +blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); +Files.copy(dependency.toPath(), blob); +blob.close(); +blob = null; -uploadNew = true; +uploadNew = true; +} finally { +try { +if (blob != null) { +blob.cancel(); +} +} catch (IOException throwaway) { +// Ignore. --- End diff -- It would be useful to log Error/warning in case there is `IOException`. ---
[GitHub] storm pull request #2923: STORM-3301: Fix case where KafkaSpout could emit t...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2923 ---
[GitHub] storm pull request #2922: STORM-3301: Fix case where KafkaSpout could emit t...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2922 ---
[GitHub] storm pull request #2917: [STORM-3294] Upgrade jetty version to latest stabl...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2917 ---
[GitHub] storm pull request #2919: STORM-3296: Upgrade curator-test to avoid CURATOR-...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2919 ---
[GitHub] storm pull request #2912: STORM-3289: Add note about KAFKA-7044 to storm-kaf...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2912 ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240657555 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { --- End diff -- sorry I was wrong ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240656033 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { --- End diff -- Formerly they were caught. Removing these lines will result in them being thrown as both the method documentation and declarations intend. We may not want this change however, and instead we could update the declaration and the documentation such that the behavior remains unchanged. I am willing to do either one, as this PR is focused on eliminating the possibility of leaking some sockets. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240642462 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { --- End diff -- It looks to me that the exception is still caught instead of thrown? ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240369252 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { -if (out != null) { -out.cancel(); -} } finally { -in.close(); +try { +if (out != null) { --- End diff -- I see what you mean. We need to preserve a distinction between cleaning up in the normal case and cleaning up in the error case. In the normal case, we want `close`. In the error case, we want `cancel`. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240368505 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { -if (out != null) { -out.cancel(); -} } finally { -in.close(); +try { +if (out != null) { --- End diff -- Previously closed was called in the normal case. Now we're always calling cancel(). These seem to have slightly different functionality. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240366774 --- Diff: storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java --- @@ -204,6 +204,14 @@ public static boolean downloadUpdatedBlob(Map conf, BlobStore bl } catch (Exception exp) { // Logging an exception while client is connecting LOG.error("Exception", exp); +} finally { +if (null != out) { --- End diff -- Same as aboveâwe probably want to remove the previous call to `close`. I'll also fix the style of the conditional to match elsewhere in the file: `out != null`. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240366381 --- Diff: storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java --- @@ -154,11 +154,22 @@ private boolean uploadDependencyToBlobStore(String key, File dependency) acls.add(new AccessControl(AccessControlType.OTHER, BlobStoreAclHandler.READ)); -AtomicOutputStream blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); -Files.copy(dependency.toPath(), blob); -blob.close(); +AtomicOutputStream blob = null; +try { +blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); +Files.copy(dependency.toPath(), blob); +blob.close(); -uploadNew = true; +uploadNew = true; --- End diff -- Actually we probably want to remove the call to `close`, since it will happen in the `finally` block now. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240365742 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { -if (out != null) { -out.cancel(); -} } finally { -in.close(); +try { +if (out != null) { --- End diff -- That is intentional. Calling `cancel` will also cancel writes associated with the stream, and will clean up any transient resources ("part" files) that may have been left around as a result of an error. It seems best to do this in the error case, and I do not think it will do any harm in other cases. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240360762 --- Diff: storm-client/src/jvm/org/apache/storm/dependency/DependencyUploader.java --- @@ -154,11 +154,22 @@ private boolean uploadDependencyToBlobStore(String key, File dependency) acls.add(new AccessControl(AccessControlType.OTHER, BlobStoreAclHandler.READ)); -AtomicOutputStream blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); -Files.copy(dependency.toPath(), blob); -blob.close(); +AtomicOutputStream blob = null; +try { +blob = getBlobStore().createBlob(key, new SettableBlobMeta(acls)); +Files.copy(dependency.toPath(), blob); +blob.close(); -uploadNew = true; +uploadNew = true; --- End diff -- shouldn't we call blob = null here to prevent a dupe cancel after close? ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240360980 --- Diff: storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java --- @@ -204,6 +204,14 @@ public static boolean downloadUpdatedBlob(Map conf, BlobStore bl } catch (Exception exp) { // Logging an exception while client is connecting LOG.error("Exception", exp); +} finally { +if (null != out) { --- End diff -- same here, shouldn't we have nulled out out on close? ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240360428 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { -if (out != null) { -out.cancel(); -} } finally { -in.close(); +try { +if (out != null) { --- End diff -- we're always calling cancel here instead of close() for the normal case. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user d2r commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240358080 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { --- End diff -- The documentation for this method indicates that we throw the above exceptions, however they were being caught instead of thrown. This change will result in the exceptions being thrown while still helping to guarantee the `out` stream is closed as the `in` stream was already being ensured to be closed. ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
GitHub user d2r opened a pull request: https://github.com/apache/storm/pull/2925 STORM-3302: Ensures we close streams to HDFS You can merge this pull request into a Git repository by running: $ git pull https://github.com/d2r/storm storm-3302-fix-socket-leaks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2925 commit 4a431cb9c7ce972657b4548df602bf8840a9297f Author: Derek Dagit Date: 2018-12-10T19:01:49Z Ensures we close streams to HDFS ---
[GitHub] storm pull request #2924: STORM-1289: Port integration-test.clj to Java
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2924 STORM-1289: Port integration-test.clj to Java https://issues.apache.org/jira/browse/STORM-1289 I have also ported a few tests to JUnit 5 and changed the IntegrationTest and PerformanceTest categories to be JUnit 5 Tags instead. I think it improves readability of the root pom. Let me know if these changes are too noisy here, and I'll move them to a separate PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-1289 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2924 commit ad5772d29e5314975e89cbf653ca3b4a40adc4d7 Author: Stig Rohde Døssing Date: 2018-12-07T21:07:36Z STORM-1289: Port integration-test.clj to Java ---
[GitHub] storm pull request #2923: STORM-3301: Fix case where KafkaSpout could emit t...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2923 STORM-3301: Fix case where KafkaSpout could emit tuples that were alr⦠â¦eady committed 1.x version of https://github.com/apache/storm/pull/2922 You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3301-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2923.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2923 commit 66081f8422833bc2c2f95ca8d236b5c114e985ed Author: Stig Rohde Døssing Date: 2018-12-08T21:15:04Z STORM-3301: Fix case where KafkaSpout could emit tuples that were already committed ---
[GitHub] storm pull request #2922: STORM-3301: Fix case where KafkaSpout could emit t...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2922 STORM-3301: Fix case where KafkaSpout could emit tuples that were alr⦠â¦eady committed https://issues.apache.org/jira/browse/STORM-3301 I removed an unnecessary contains check in KafkaSpout during init, it would always return false. I made a slight change to log compaction handling, the spout will now commit an acked offset after a gap, even if it's the only one. Previously there had to be at least 2 acked offsets after the gap before commit. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3301 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2922.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2922 commit a657686e150edf3de6942d2b51fe19461d47376a Author: Stig Rohde Døssing Date: 2018-12-08T21:15:04Z STORM-3301: Fix case where KafkaSpout could emit tuples that were already committed ---
[GitHub] storm pull request #2921: STORM-3300: Fix NPE in Acker that could occur if s...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2921 STORM-3300: Fix NPE in Acker that could occur if sending reset timeou⦠â¦t tuples The error occurs when `int task = curr.spoutTask;` is hit during processing of a reset timeout tuple, if the acker hasn't received the ack_init tuple for the id yet. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3300 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2921.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2921 commit df84dec0a55c7dba4c5f400329481eee8d20492a Author: Stig Rohde Døssing Date: 2018-12-08T15:37:57Z STORM-3300: Fix NPE in Acker that could occur if sending reset timeout tuples ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239522894 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); } } +// make sure we've handled all supervisors on the host before we break +if (!shortage.areAnyOverZero() && shortageSlots <= 0) { +// we have enough resources now... +break; --- End diff -- I am doing what you are indicating I create a list of supervisors on the host and release all the supervisors before breaking. ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239517824 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); --- End diff -- @danny0405 This code only happens when the cluster is full. Meaning a topology is not scheduled and there are not enough free resources on the cluster to run that topology. The idea is that if a cluster is full and there are blacklisted supervisors it is better to try and run things on possibly bad hosts instead of not running anything. The issue here is that the existing code is broken if there are multiple supervisors on a single node, and this is fixing the existing code to operate properly in that case. If you have problems with releasing blacklisted supervisors/nodes at all then that is a separate JIRA. ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239519351 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); } } +// make sure we've handled all supervisors on the host before we break +if (!shortage.areAnyOverZero() && shortageSlots <= 0) { +// we have enough resources now... +break; --- End diff -- I think we need to break after we have released all of the supervisors on the node, otherwise we can have released a single supervisor on a node and got enough resources, but but because there is still at least one supervisor on that node still blacklisted the whole node is still blacklisted. ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239461731 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); --- End diff -- It sounds like you don't agree even with the existing code that releases blacklisted supervisors when needed for scheduling? Should I be looking at converting all the existing code to blacklist supervisors instead of hosts to get things functional when there are more than one supervisor on a host? Thanks. ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239306840 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); --- End diff -- @agresch Got your idea. In my personal opinion, for production, if a node has a blacklist supervisor(which means the supervisor does not send any HBs for some time), most of the cases it because the node machine itself has some problems(for now there are a few causes like: disk is full or network is in disconnection), so a safer way is we never schedule workers to the node if there are some blacklist supervisors on it. If you want to make use of the healthy supervisor on the node(has some blacklist supervisors also), at least there is a decision to make sure the supervisor is healthy, we can do this through checking the heartbeats of it. ---
[GitHub] storm pull request #2920: STORM-3297 prevent supervisor restart when no nimb...
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2920#discussion_r239235259 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java --- @@ -24,7 +24,7 @@ public void processWorkerMetrics(Map conf, WorkerMetrics metrics) throws MetricException { try (NimbusClient client = NimbusClient.getConfiguredClient(conf)) { client.getClient().processWorkerMetrics(metrics); -} catch (TException e) { --- End diff -- done ---
[GitHub] storm pull request #2920: STORM-3297 prevent supervisor restart when no nimb...
Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/storm/pull/2920#discussion_r239231045 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java --- @@ -24,7 +24,7 @@ public void processWorkerMetrics(Map conf, WorkerMetrics metrics) throws MetricException { try (NimbusClient client = NimbusClient.getConfiguredClient(conf)) { client.getClient().processWorkerMetrics(metrics); -} catch (TException e) { --- End diff -- Could we instead of handling all exceptions, be more specific on capturing `NimbusLeaderNotFoundException` ? `catch (TException | NimbusLeaderNotFoundException e)` ---
[GitHub] storm pull request #2920: STORM-3297 prevent supervisor restart when no nimb...
GitHub user agresch opened a pull request: https://github.com/apache/storm/pull/2920 STORM-3297 prevent supervisor restart when no nimbus leader exists You can merge this pull request into a Git repository by running: $ git pull https://github.com/agresch/storm agresch_processWorkerMetrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2920.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2920 commit d8e649d2139a7fb7e71c022a28eefd40f684308e Author: Aaron Gresch Date: 2018-12-05T17:27:43Z STORM-3297 prevent supervisor restart when no nimbus leader exists ---
[GitHub] storm pull request #2919: STORM-3296: Upgrade curator-test to avoid CURATOR-...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2919 STORM-3296: Upgrade curator-test to avoid CURATOR-409 https://issues.apache.org/jira/browse/STORM-3296 You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3296 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2919.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2919 commit 3c4646a14dff42fc23809192d56efcb29a14525e Author: Stig Rohde Døssing Date: 2018-12-05T13:56:41Z STORM-3296: Upgrade curator-test to avoid CURATOR-409 ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239066404 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); --- End diff -- If there are two supervisors on a host that are blacklisted and we release one, one supervisor will remain blacklisted. This single blacklisted supervisor will be returned here: https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java#L167 Then this code will see that the same host that both supervisors are on is blacklisted, preventing any scheduling on that node: https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java#L167 I think a better overall long term solution is to blacklist supervisors instead of hosts, but that touches a lot more code. In the short term I think this is a relatively small tradeoff to allow blacklisting to work with multiple supervisors per host. ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239039664 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); --- End diff -- So what is the purpose we need to release supervisors blacklist grouping by nodes. What if the node is stuck or broken ? If we shuffle the release by nodes, may be the risk would be spread evenly. ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
GitHub user agresch opened a pull request: https://github.com/apache/storm/pull/2918 STORM-3295 allow blacklist scheduling to function properly with multi⦠â¦ple supervisors on a host If any supervisor on a host remains blacklisted, BlacklistScheduler.getBlacklistHosts() still considers the host blacklisted. This change forces all supervisors on a supervisor to be released, which will free the host. It may be nicer to consider blacklisting strictly based on supervisors, but that is open to discussion, and a much larger change than this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/agresch/storm agresch_blacklist Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2918.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2918 commit b4d3df2167b2962bdf7039ba064ad7a646f32644 Author: Aaron Gresch Date: 2018-12-04T22:23:35Z STORM-3295 allow blacklist scheduling to function properly with multiple supervisors on a host ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
GitHub user govind-menon reopened a pull request: https://github.com/apache/storm/pull/2881 STORM-3259: NUMA Support for Storm Only functional changes - putting up for review now and tests soon to follow. Have Done the following tests 1. Mixed cluster - numa supervisors and normal supervisor 2. Numa only supervisors 3. Profiling actions 4. Rebalance You can merge this pull request into a Git repository by running: $ git pull https://github.com/govind-menon/storm STORM-3259 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2881.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2881 commit 4597f7ac25fafb70dcdf2a63551650cee90ab8e3 Author: Govind Menon Date: 2018-12-04T19:50:20Z STORM-3259: NUMA Support for Storm ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user govind-menon closed the pull request at: https://github.com/apache/storm/pull/2881 ---
[GitHub] storm pull request #2917: [STORM-3294] Upgrade jetty version to latest stabl...
GitHub user kishorvpatil opened a pull request: https://github.com/apache/storm/pull/2917 [STORM-3294] Upgrade jetty version to latest stable 9.4.14.v20181114 version You can merge this pull request into a Git repository by running: $ git pull https://github.com/kishorvpatil/incubator-storm upgrade-jetty-version Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2917.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2917 commit d460bfe18b3cf5c6bbeb8231fe018d15ce5d62c2 Author: Kishor Patil Date: 2018-12-03T19:22:25Z Upgrade jetty version to latest stable ---
[GitHub] storm pull request #2916: STORM-3292: flush writers in HiveState when the tr...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2916 ---
[GitHub] storm pull request #2913: STORM-3290: Split configuration for storm-kafka-cl...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2913 ---
[GitHub] storm pull request #2908: STORM-3276: Updated Flux to deal with storm local ...
Github user jnioche commented on a diff in the pull request: https://github.com/apache/storm/pull/2908#discussion_r237006771 --- Diff: flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java --- @@ -52,17 +52,22 @@ public class Flux { private static final Logger LOG = LoggerFactory.getLogger(Flux.class); +@Deprecated --- End diff -- Got you! Your comment was pretty clear, I just need another coffee :-) ---
[GitHub] storm pull request #2908: STORM-3276: Updated Flux to deal with storm local ...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2908#discussion_r237005496 --- Diff: flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java --- @@ -52,17 +52,22 @@ public class Flux { private static final Logger LOG = LoggerFactory.getLogger(Flux.class); +@Deprecated --- End diff -- Sure, what I meant was if we're going to break the API anyway I think it makes sense to remove the options entirely, rather than deprecating them. ---
[GitHub] storm pull request #2908: STORM-3276: Updated Flux to deal with storm local ...
Github user jnioche commented on a diff in the pull request: https://github.com/apache/storm/pull/2908#discussion_r237004334 --- Diff: flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java --- @@ -52,17 +52,22 @@ public class Flux { private static final Logger LOG = LoggerFactory.getLogger(Flux.class); +@Deprecated --- End diff -- IMHO this it is acceptable for a major release. Projects need a drastic cleanup once in a while. ---
[GitHub] storm pull request #2916: STORM-3292: flush writers in HiveState when the tr...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/storm/pull/2916 STORM-3292: flush writers in HiveState when the trident batch commits You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/storm STORM-3292 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2916.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2916 commit 07634fcb881a53d548827f27b1a9388ff8982f5e Author: Arun Mahadevan Date: 2018-11-27T02:15:58Z STORM-3292: flush writers in HiveState when the trident batch commits ---
[GitHub] storm pull request #2908: STORM-3276: Updated Flux to deal with storm local ...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2908#discussion_r236360412 --- Diff: storm-server/src/main/java/org/apache/storm/LocalCluster.java --- @@ -323,6 +342,13 @@ private static boolean areAllWorkersWaiting() { } } +private static final long DEFAULT_ZK_PORT = 2181; --- End diff -- Nit: Please move this to the other constants so it's not in the middle of method declarations. ---
[GitHub] storm pull request #2908: STORM-3276: Updated Flux to deal with storm local ...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2908#discussion_r236358832 --- Diff: flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java --- @@ -52,17 +52,22 @@ public class Flux { private static final Logger LOG = LoggerFactory.getLogger(Flux.class); +@Deprecated --- End diff -- Given that current scripts to start a Flux topology will have to be changed to make this work, is there any value to preserving these options? If someone is using one of these, most likely their script will end up failing or doing the wrong thing. ---
[GitHub] storm pull request #2910: STORM-3288: Fix issue with reources in jar files
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2910 ---
[GitHub] storm pull request #2914: STORM-1311: Removing superfluous debugging
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2914 ---
[GitHub] storm pull request #2915: [STORM-3291]Worker can't run as the user who submi...
GitHub user liu-zhaokun opened a pull request: https://github.com/apache/storm/pull/2915 [STORM-3291]Worker can't run as the user who submitted the topology [https://issues.apache.org/jira/browse/STORM-3291](https://issues.apache.org/jira/browse/STORM-3291) Without principal, worker can't be launched as the user who submitted the topology even we set "supervisor.run.worker.as.user" to true.Because the submitterUser will be overwrited by the user who launched nimbus.Even in ui,we can see the owner is the user who launch nimbus. You can merge this pull request into a Git repository by running: $ git pull https://github.com/liu-zhaokun/storm master1123 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2915 commit 8f3b1e8f12c80ef6908696817671e3a4fe9757a3 Author: liuzhaokun Date: 2018-11-23T07:10:21Z [STORM-3291]Worker can't run as the user who submitted the topology ---
[GitHub] storm pull request #2914: STORM-1311: Removing superfluous debugging
GitHub user govind-menon opened a pull request: https://github.com/apache/storm/pull/2914 STORM-1311: Removing superfluous debugging You can merge this pull request into a Git repository by running: $ git pull https://github.com/govind-menon/storm patch-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2914.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2914 commit c92918949c3b8830c153807b761ac1c10022f1c6 Author: Govind Menon Date: 2018-11-20T23:11:06Z STORM-1311: Removing superfluous debugging ---