[GitHub] storm pull request #2831: STORM-3224: Fix FLUX YAML Viewer icon location/pos...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2831 ---
[GitHub] storm pull request #2840: STORM-3147: Add metrics based on ClusterSummary
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2840 STORM-3147: Add metrics based on ClusterSummary Rebase and update of https://github.com/apache/storm/pull/2764. @zd-project Please take a look when you have a chance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3147 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2840.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2840 commit 02428594b53801572f657f4d0bff85b2213b3723 Author: Zhengdai Hu Date: 2018-07-13T19:22:20Z STORM-3147: Port ClusterSummary to StormMetricsRegistry commit 392803c9bb2fe81a5df57f695a0e6d7c37bd1f2e Author: Stig Rohde Døssing Date: 2018-09-17T20:21:12Z STORM-3147: Fix minor nits, rebase to use non-static metrics registry ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2836 ---
[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2805 ---
[GitHub] storm issue #2831: STORM-3224: Fix FLUX YAML Viewer icon location/position o...
Github user kishorvpatil commented on the issue: https://github.com/apache/storm/pull/2831 https://user-images.githubusercontent.com/6090397/45646222-afa9d500-ba90-11e8-9651-56b153e7ce42.png";> @revans2 I fixed the position and removed duplicate space entries showing FLUX image icon. ---
[GitHub] storm issue #2836: STORM-3162: Cleanup heartbeats cache and make it thread s...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2836 Looks good, thanks. +1 ---
[GitHub] storm issue #2836: STORM-3162: Cleanup heartbeats cache and make it thread s...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2836 @srdo I think I addressed all of your review comments. ---
[GitHub] storm issue #2831: STORM-3224: Fix FLUX YAML Viewer icon location/position o...
Github user kishorvpatil commented on the issue: https://github.com/apache/storm/pull/2831 OMG, my bad.. looks like the span tag is added twice. Let me fix it.. ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218177587 --- Diff: storm-server/pom.xml --- @@ -171,7 +171,7 @@ maven-checkstyle-plugin -780 +853 --- End diff -- No, it's fine. ---
[GitHub] storm issue #2831: STORM-3224: Fix FLUX YAML Viewer icon location/position o...
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2831 Sorry to pull my +1 back. On both Firefox and Chrome it looks off to me. Could you take a look at this again? ![ui](https://user-images.githubusercontent.com/3441321/45642002-a151be00-ba7c-11e8-9101-161114487d54.png) ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218174414 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds a cache of heartbeats from the workers. + */ +public class HeartbeatCache { +private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class); +private static final Function, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>(); + +private static class ExecutorCache { +private Boolean isTimedOut; +private Integer nimbusTime; +private Integer executorReportedTime; + +public ExecutorCache(Map newBeat) { +if (newBeat != null) { +executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +} else { +executorReportedTime = 0; +} + +nimbusTime = Time.currentTimeSecs(); +isTimedOut = false; +} + +public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) { +this.isTimedOut = isTimedOut; +this.nimbusTime = nimbusTime; +this.executorReportedTime = executorReportedTime; +} + +public synchronized Boolean isTimedOut() { +return isTimedOut; +} + +public synchronized Integer getNimbusTime() { +return nimbusTime; +} + +public synchronized Integer getExecutorReportedTime() { +return executorReportedTime; +} + +public synchronized void updateTimeout(Integer timeout) { +isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout; +} + +public synchronized void updateFromHb(Integer timeout, Map newBeat) { +if (newBeat != null) { +Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +if (!newReportedTime.equals(executorReportedTime)) { +nimbusTime = Time.currentTimeSecs(); +} +executorReportedTime = newReportedTime; +} +updateTimeout(timeout); +} +} + +//Topology Id -> executor ids -> component -> stats(...) +private final ConcurrentHashMap, ExecutorCache>> cache; + +/** + * Create an empty cache. + */ +public HeartbeatCache() { +this.cache = new ConcurrentHashMap<>(); +} + +/** + * Add an empty topology to the cache for testing purposes. + * @param topoId the id of the topology to add. + */ +@VisibleForTesting +public void addEmptyTopoForTests(String topoId) { +cache.put(topoId, new ConcurrentHashMap<>()); +} + +/** + * Get the number of topologies with cached heartbeats. + * @return the number of topologies with cached heartbeats. +
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218174140 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4624,7 +4569,7 @@ public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException, return true; } -private static final class Assoc implements UnaryOperator> { +static final class Assoc implements UnaryOperator> { --- End diff -- Yes, I'll make them private again. This was because as I was refactoring I kept the Assoc and Dissoc until the very end when I fixed the threading. ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218173878 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds a cache of heartbeats from the workers. + */ +public class HeartbeatCache { +private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class); +private static final Function, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>(); + +private static class ExecutorCache { +private Boolean isTimedOut; +private Integer nimbusTime; +private Integer executorReportedTime; + +public ExecutorCache(Map newBeat) { +if (newBeat != null) { +executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +} else { +executorReportedTime = 0; +} + +nimbusTime = Time.currentTimeSecs(); +isTimedOut = false; +} + +public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) { +this.isTimedOut = isTimedOut; +this.nimbusTime = nimbusTime; +this.executorReportedTime = executorReportedTime; +} + +public synchronized Boolean isTimedOut() { +return isTimedOut; +} + +public synchronized Integer getNimbusTime() { +return nimbusTime; +} + +public synchronized Integer getExecutorReportedTime() { +return executorReportedTime; +} + +public synchronized void updateTimeout(Integer timeout) { +isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout; +} + +public synchronized void updateFromHb(Integer timeout, Map newBeat) { +if (newBeat != null) { +Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +if (!newReportedTime.equals(executorReportedTime)) { +nimbusTime = Time.currentTimeSecs(); +} +executorReportedTime = newReportedTime; +} +updateTimeout(timeout); +} +} + +//Topology Id -> executor ids -> component -> stats(...) +private final ConcurrentHashMap, ExecutorCache>> cache; + +/** + * Create an empty cache. + */ +public HeartbeatCache() { +this.cache = new ConcurrentHashMap<>(); +} + +/** + * Add an empty topology to the cache for testing purposes. + * @param topoId the id of the topology to add. + */ +@VisibleForTesting +public void addEmptyTopoForTests(String topoId) { +cache.put(topoId, new ConcurrentHashMap<>()); +} + +/** + * Get the number of topologies with cached heartbeats. + * @return the number of topologies with cached heartbeats. +
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218173746 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds a cache of heartbeats from the workers. + */ +public class HeartbeatCache { +private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class); +private static final Function, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>(); + +private static class ExecutorCache { +private Boolean isTimedOut; +private Integer nimbusTime; +private Integer executorReportedTime; + +public ExecutorCache(Map newBeat) { +if (newBeat != null) { +executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +} else { +executorReportedTime = 0; +} + +nimbusTime = Time.currentTimeSecs(); +isTimedOut = false; +} + +public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) { +this.isTimedOut = isTimedOut; +this.nimbusTime = nimbusTime; +this.executorReportedTime = executorReportedTime; +} + +public synchronized Boolean isTimedOut() { +return isTimedOut; +} + +public synchronized Integer getNimbusTime() { +return nimbusTime; +} + +public synchronized Integer getExecutorReportedTime() { +return executorReportedTime; +} + +public synchronized void updateTimeout(Integer timeout) { +isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout; +} + +public synchronized void updateFromHb(Integer timeout, Map newBeat) { +if (newBeat != null) { +Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +if (!newReportedTime.equals(executorReportedTime)) { +nimbusTime = Time.currentTimeSecs(); +} +executorReportedTime = newReportedTime; +} +updateTimeout(timeout); +} +} + +//Topology Id -> executor ids -> component -> stats(...) +private final ConcurrentHashMap, ExecutorCache>> cache; + +/** + * Create an empty cache. + */ +public HeartbeatCache() { +this.cache = new ConcurrentHashMap<>(); +} + +/** + * Add an empty topology to the cache for testing purposes. + * @param topoId the id of the topology to add. + */ +@VisibleForTesting +public void addEmptyTopoForTests(String topoId) { +cache.put(topoId, new ConcurrentHashMap<>()); +} + +/** + * Get the number of topologies with cached heartbeats. + * @return the number of topologies with cached heartbeats. +
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218173536 --- Diff: storm-server/pom.xml --- @@ -171,7 +171,7 @@ maven-checkstyle-plugin -780 +853 --- End diff -- Yes it is all the issues that came with StatsUtil, which is why the other one went in a good direction. I can try and clean them up more if you want. ---
[GitHub] storm pull request #2839: STORM-3228 allow refernce counting of differing Po...
GitHub user agresch opened a pull request: https://github.com/apache/storm/pull/2839 STORM-3228 allow refernce counting of differing PortAndAssignment obj⦠â¦ects to work properly You can merge this pull request into a Git repository by running: $ git pull https://github.com/agresch/storm agresch_storm-3228 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2839.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2839 commit 5c47b2d7c1f3628c892e16b4d0f2dc78beddbb28 Author: Aaron Gresch Date: 2018-09-17T17:58:31Z STORM-3228 allow refernce counting of differing PortAndAssignment objects to work properly ---
[GitHub] storm issue #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2805 Test failure looks unrelated, it's getting a permission error when writing to the local maven repo. ---
[GitHub] storm pull request #2832: STORM-3205 Optimization in TuplImpl
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2832 ---
[GitHub] storm pull request #2838: STORM-3227: Only push credentials if going to expe...
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2838 STORM-3227: Only push credentials if going to expected user You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3227 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2838 commit 7607778bb5387a98f2e0bf5cb0c70e69c967d137 Author: Robert (Bobby) Evans Date: 2018-09-17T16:29:29Z STORM-3227: Only push credentials if going to expected user ---
[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2805#discussion_r218150464 --- Diff: storm-server/src/main/java/org/apache/storm/LocalDRPC.java --- @@ -38,9 +39,9 @@ private final DRPC drpc; private final String serviceId; -public LocalDRPC() { +public LocalDRPC(StormMetricsRegistry metricsRegistry) { --- End diff -- Added a no-args constructor that creates a new registry. ---
[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2805#discussion_r218148492 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java --- @@ -219,11 +201,11 @@ public void kill() throws IOException { shutdownTimer = shutdownDuration.time(); } try { -Set pids = getAllPids(); +Set pids = getAllPids(); -for (Long pid : pids) { -kill(pid); -} +for (Long pid : pids) { +kill(pid); +} --- End diff -- Will reformat ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218142320 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds a cache of heartbeats from the workers. + */ +public class HeartbeatCache { +private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class); +private static final Function, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>(); + +private static class ExecutorCache { +private Boolean isTimedOut; +private Integer nimbusTime; +private Integer executorReportedTime; + +public ExecutorCache(Map newBeat) { +if (newBeat != null) { +executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +} else { +executorReportedTime = 0; +} + +nimbusTime = Time.currentTimeSecs(); +isTimedOut = false; +} + +public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) { +this.isTimedOut = isTimedOut; +this.nimbusTime = nimbusTime; +this.executorReportedTime = executorReportedTime; +} + +public synchronized Boolean isTimedOut() { +return isTimedOut; +} + +public synchronized Integer getNimbusTime() { +return nimbusTime; +} + +public synchronized Integer getExecutorReportedTime() { +return executorReportedTime; +} + +public synchronized void updateTimeout(Integer timeout) { +isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout; +} + +public synchronized void updateFromHb(Integer timeout, Map newBeat) { +if (newBeat != null) { +Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +if (!newReportedTime.equals(executorReportedTime)) { +nimbusTime = Time.currentTimeSecs(); +} +executorReportedTime = newReportedTime; +} +updateTimeout(timeout); +} +} + +//Topology Id -> executor ids -> component -> stats(...) +private final ConcurrentHashMap, ExecutorCache>> cache; + +/** + * Create an empty cache. + */ +public HeartbeatCache() { +this.cache = new ConcurrentHashMap<>(); +} + +/** + * Add an empty topology to the cache for testing purposes. + * @param topoId the id of the topology to add. + */ +@VisibleForTesting +public void addEmptyTopoForTests(String topoId) { +cache.put(topoId, new ConcurrentHashMap<>()); +} + +/** + * Get the number of topologies with cached heartbeats. + * @return the number of topologies with cached heartbeats. +
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218121407 --- Diff: storm-server/pom.xml --- @@ -171,7 +171,7 @@ maven-checkstyle-plugin -780 +853 --- End diff -- This number seems to be moving in the wrong direction :) ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r217911368 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds a cache of heartbeats from the workers. + */ +public class HeartbeatCache { +private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class); +private static final Function, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>(); + +private static class ExecutorCache { +private Boolean isTimedOut; +private Integer nimbusTime; --- End diff -- Please rename to include the unit of time. ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r217911059 --- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.stats; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.generated.ClusterWorkerHeartbeat; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.shade.com.google.common.collect.Lists; +import org.apache.storm.utils.Time; + +/** + * Stats calculations needed by storm client code. + */ +public class ClientStatsUtil { +public static final String SPOUT = "spout"; +public static final String BOLT = "bolt"; +static final String EXECUTOR_STATS = "executor-stats"; +static final String UPTIME = "uptime"; +public static final String TIME_SECS = "time-secs"; +public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer(); +public static final IdentityTransformer IDENTITY = new IdentityTransformer(); + +/** + * Convert a List executor to java List. + */ +public static List convertExecutor(List executor) { +return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue()); +} + +/** + * Make and map of executors to empty stats. --- End diff -- Nit: Couple of places saying "and" where it should say "a"/"an" ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218140315 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds a cache of heartbeats from the workers. + */ +public class HeartbeatCache { +private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class); +private static final Function, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>(); + +private static class ExecutorCache { +private Boolean isTimedOut; +private Integer nimbusTime; +private Integer executorReportedTime; + +public ExecutorCache(Map newBeat) { +if (newBeat != null) { +executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +} else { +executorReportedTime = 0; +} + +nimbusTime = Time.currentTimeSecs(); +isTimedOut = false; +} + +public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) { +this.isTimedOut = isTimedOut; +this.nimbusTime = nimbusTime; +this.executorReportedTime = executorReportedTime; +} + +public synchronized Boolean isTimedOut() { +return isTimedOut; +} + +public synchronized Integer getNimbusTime() { +return nimbusTime; +} + +public synchronized Integer getExecutorReportedTime() { +return executorReportedTime; +} + +public synchronized void updateTimeout(Integer timeout) { +isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout; +} + +public synchronized void updateFromHb(Integer timeout, Map newBeat) { +if (newBeat != null) { +Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +if (!newReportedTime.equals(executorReportedTime)) { +nimbusTime = Time.currentTimeSecs(); +} +executorReportedTime = newReportedTime; +} +updateTimeout(timeout); +} +} + +//Topology Id -> executor ids -> component -> stats(...) +private final ConcurrentHashMap, ExecutorCache>> cache; + +/** + * Create an empty cache. + */ +public HeartbeatCache() { +this.cache = new ConcurrentHashMap<>(); +} + +/** + * Add an empty topology to the cache for testing purposes. + * @param topoId the id of the topology to add. + */ +@VisibleForTesting +public void addEmptyTopoForTests(String topoId) { +cache.put(topoId, new ConcurrentHashMap<>()); +} + +/** + * Get the number of topologies with cached heartbeats. + * @return the number of topologies with cached heartbeats. +
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218124022 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds a cache of heartbeats from the workers. + */ +public class HeartbeatCache { +private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class); +private static final Function, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>(); + +private static class ExecutorCache { +private Boolean isTimedOut; +private Integer nimbusTime; +private Integer executorReportedTime; + +public ExecutorCache(Map newBeat) { +if (newBeat != null) { +executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +} else { +executorReportedTime = 0; +} + +nimbusTime = Time.currentTimeSecs(); +isTimedOut = false; +} + +public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) { +this.isTimedOut = isTimedOut; +this.nimbusTime = nimbusTime; +this.executorReportedTime = executorReportedTime; +} + +public synchronized Boolean isTimedOut() { +return isTimedOut; +} + +public synchronized Integer getNimbusTime() { +return nimbusTime; +} + +public synchronized Integer getExecutorReportedTime() { +return executorReportedTime; +} + +public synchronized void updateTimeout(Integer timeout) { +isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout; +} + +public synchronized void updateFromHb(Integer timeout, Map newBeat) { +if (newBeat != null) { +Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); +if (!newReportedTime.equals(executorReportedTime)) { +nimbusTime = Time.currentTimeSecs(); +} +executorReportedTime = newReportedTime; +} +updateTimeout(timeout); +} +} + +//Topology Id -> executor ids -> component -> stats(...) +private final ConcurrentHashMap, ExecutorCache>> cache; + +/** + * Create an empty cache. + */ +public HeartbeatCache() { +this.cache = new ConcurrentHashMap<>(); +} + +/** + * Add an empty topology to the cache for testing purposes. + * @param topoId the id of the topology to add. + */ +@VisibleForTesting +public void addEmptyTopoForTests(String topoId) { +cache.put(topoId, new ConcurrentHashMap<>()); +} + +/** + * Get the number of topologies with cached heartbeats. + * @return the number of topologies with cached heartbeats. +
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218140867 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4624,7 +4569,7 @@ public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException, return true; } -private static final class Assoc implements UnaryOperator> { +static final class Assoc implements UnaryOperator> { --- End diff -- Assoc and Dissoc don't seem to be used in new places, so is making them package private still necessary? ---
[GitHub] storm pull request #2833: STORM-3225: Use MediaType for check
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2833 ---
[GitHub] storm pull request #2837: Remove powered-by.md, it lives in the storm-site r...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2837 ---
[GitHub] storm pull request #2834: STORM-3226: Update error message to be more clear
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2834 ---
[GitHub] storm issue #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2805 @kishorvpatil yes but the merge conflicts are minor. I wanted to get this reviewed so @srdo can address any review comments at the same time as fixing the merge conflicts so we can get this in ASAP and have a 2.0.0 RC out ASAP. ---
[GitHub] storm issue #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user kishorvpatil commented on the issue: https://github.com/apache/storm/pull/2805 @revans2 Looks like we have few conflicts on this PR? ---
[GitHub] storm pull request #2834: STORM-3226: Update error message to be more clear
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2834#discussion_r218116277 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -135,7 +135,7 @@ public Supervisor(Map conf, IContext sharedContext, ISupervisor (String) conf.get(DaemonConfig.SUPERVISOR_AUTHORIZER), conf); if (authorizationHandler == null && conf.get(DaemonConfig.NIMBUS_AUTHORIZER) != null) { throw new IllegalStateException("It looks like authorization is turned on for nimbus but not for the " -+ "supervisor"); ++ "supervisor ( " + DaemonConfig.SUPERVISOR_AUTHORIZER + " is not set)"); --- End diff -- Will do on checkin. ---
[GitHub] storm issue #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2805 Also the merge conflicts are really minor. ---
[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2805#discussion_r218107287 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java --- @@ -219,11 +201,11 @@ public void kill() throws IOException { shutdownTimer = shutdownDuration.time(); } try { -Set pids = getAllPids(); +Set pids = getAllPids(); -for (Long pid : pids) { -kill(pid); -} +for (Long pid : pids) { +kill(pid); +} --- End diff -- It looks like indentation is off around here. could be spaces vs tabs or something, not really sure. ---
[GitHub] storm pull request #2805: STORM-3197: Make StormMetricsRegistry non-static
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2805#discussion_r218106022 --- Diff: storm-server/src/main/java/org/apache/storm/LocalDRPC.java --- @@ -38,9 +39,9 @@ private final DRPC drpc; private final String serviceId; -public LocalDRPC() { +public LocalDRPC(StormMetricsRegistry metricsRegistry) { --- End diff -- This feels problematic to me. At a minimum we need to update ./docs/Local-mode.md to have the new way to create it documented, but I would really prefer to just have at least the option for a default constructor. This is local mode the only reason we would need to have a single metrics registry would be if we intended to gather metrics from it afterwards as a part of a test, which we don't really do right now. ---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218088968 --- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.stats; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.generated.ClusterWorkerHeartbeat; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.shade.com.google.common.collect.Lists; +import org.apache.storm.utils.Time; + +/** + * Stats calculations needed by storm client code. + */ +public class ClientStatsUtil { +public static final String SPOUT = "spout"; +public static final String BOLT = "bolt"; +static final String EXECUTOR_STATS = "executor-stats"; +static final String UPTIME = "uptime"; +public static final String TIME_SECS = "time-secs"; +public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer(); +public static final IdentityTransformer IDENTITY = new IdentityTransformer(); + +/** + * Convert a List executor to java List. + */ +public static List convertExecutor(List executor) { +return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue()); +} + +/** + * Make and map of executors to empty stats. + * @param executors the executors as keys of the map. + * @return and empty map of executors to stats. + */ +public static Map, ExecutorStats> mkEmptyExecutorZkHbs(Set> executors) { +Map, ExecutorStats> ret = new HashMap<>(); +for (Object executor : executors) { +List startEnd = (List) executor; +ret.put(convertExecutor(startEnd), null); +} +return ret; +} + +/** + * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps. + */ +public static Map, ExecutorStats> convertExecutorZkHbs(Map, ExecutorStats> executorBeats) { +Map, ExecutorStats> ret = new HashMap<>(); +for (Map.Entry, ExecutorStats> entry : executorBeats.entrySet()) { +ret.put(convertExecutor(entry.getKey()), entry.getValue()); +} +return ret; +} + +/** + * Create a new worker heartbeat for zookeeper. + * @param topoId the topology id + * @param executorStats the stats for the executors + * @param uptime the uptime for the worker. + * @return the heartbeat map. + */ +public static Map mkZkWorkerHb(String topoId, Map, ExecutorStats> executorStats, Integer uptime) { +Map ret = new HashMap<>(); +ret.put("storm-id", topoId); +ret.put(EXECUTOR_STATS, executorStats); +ret.put(UPTIME, uptime); +ret.put(TIME_SECS, Time.currentTimeSecs()); + +return ret; +} + +private static Number getByKeyOr0(Map m, String k) { +if (m == null) { +return 0; +} + +Number n = (Number) m.get(k); +if (n == null) { +return 0; +} +return n; +} + +/** + * Get a sub-map by a given key. + * @param map the original map + * @param key the key to get it from. + * @return the map stored under key. + */ +public static Map getMapByKey(Map map, String key) { +if (map == null) { +return null; +} +return (Map) map.get(key); +} + +public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map heartbeat) { +ClusterWorkerHeartbeat ret = new ClusterW
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218086663 --- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.stats; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.generated.ClusterWorkerHeartbeat; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.shade.com.google.common.collect.Lists; +import org.apache.storm.utils.Time; + +/** + * Stats calculations needed by storm client code. + */ +public class ClientStatsUtil { +public static final String SPOUT = "spout"; +public static final String BOLT = "bolt"; +static final String EXECUTOR_STATS = "executor-stats"; +static final String UPTIME = "uptime"; +public static final String TIME_SECS = "time-secs"; +public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer(); +public static final IdentityTransformer IDENTITY = new IdentityTransformer(); + +/** + * Convert a List executor to java List. + */ +public static List convertExecutor(List executor) { +return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue()); +} + +/** + * Make and map of executors to empty stats. + * @param executors the executors as keys of the map. + * @return and empty map of executors to stats. + */ +public static Map, ExecutorStats> mkEmptyExecutorZkHbs(Set> executors) { +Map, ExecutorStats> ret = new HashMap<>(); +for (Object executor : executors) { +List startEnd = (List) executor; +ret.put(convertExecutor(startEnd), null); +} +return ret; +} + +/** + * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps. + */ +public static Map, ExecutorStats> convertExecutorZkHbs(Map, ExecutorStats> executorBeats) { +Map, ExecutorStats> ret = new HashMap<>(); +for (Map.Entry, ExecutorStats> entry : executorBeats.entrySet()) { +ret.put(convertExecutor(entry.getKey()), entry.getValue()); +} +return ret; +} + +/** + * Create a new worker heartbeat for zookeeper. + * @param topoId the topology id + * @param executorStats the stats for the executors + * @param uptime the uptime for the worker. + * @return the heartbeat map. + */ +public static Map mkZkWorkerHb(String topoId, Map, ExecutorStats> executorStats, Integer uptime) { +Map ret = new HashMap<>(); +ret.put("storm-id", topoId); +ret.put(EXECUTOR_STATS, executorStats); +ret.put(UPTIME, uptime); +ret.put(TIME_SECS, Time.currentTimeSecs()); + +return ret; +} + +private static Number getByKeyOr0(Map m, String k) { +if (m == null) { +return 0; +} + +Number n = (Number) m.get(k); +if (n == null) { +return 0; +} +return n; +} + +/** + * Get a sub-map by a given key. + * @param map the original map + * @param key the key to get it from. + * @return the map stored under key. + */ +public static Map getMapByKey(Map map, String key) { +if (map == null) { +return null; +} +return (Map) map.get(key); +} + +public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map heartbeat) { +ClusterWorkerHeartbeat ret = new Clu
Re: Regarding releasing Apache Storm 2.0.0
I think we are really close on this and I would love to see us get an RC out ASAP. We are still missing some things that Stig called out. https://github.com/apache/storm/pull/2719 has a build issue, not sure if we need to make an alternative patch or not. https://github.com/apache/storm/pull/2800 has a newer alternative patch https://github.com/apache/storm/pull/2836 please take a look. https://github.com/apache/storm/pull/2805 has some merge conflicts currently, but everyone please take a chance to review it. Thanks, Bobby On Fri, Sep 14, 2018 at 2:57 AM Jungtaek Lim wrote: > I have sought the name of client artifact from some of streaming > frameworks. Please refer below: > > Spark: spark-core > Kafka: kafka-clients > Flink: flink-clients > Heron: heron-api > > Based on divergence, I don't see the reason "storm-core" is the only name > which avoid confusion. Actually, if my understanding is right, we need to > let end users including "storm-server" when running local cluster, then > "storm-core" vs "storm-server" would give real confusion. I guess we > already discussed about the naming, and given that we don't rename it we > are OK with renamed artifacts. > > 2018년 9월 14일 (금) 오후 4:07, Roshan Naik 님이 > 작성: > > > Happy to see consensus in moving fwd with 2.0 soon. > > I will try to get a minor patch (STORM-3205) within 24 hours ... as it > > seems like it has potential to deliver a decent perf boost and energy > > savings. > > One thing I am hoping we can address before releasing Storm 2 is... to > fix > > the naming of the storm-client.jar. Its such a core jar really, it > should > > have been really called storm-core or something like that... but > > unfortunately we already have another jar with that name. Retaining the > > 'client' name for this new jar would be confusing and give wrong > > impressions to users and any new devs IMO. > > -roshan > > > > On Thursday, September 13, 2018, 2:12:40 PM PDT, Govind Menon > > wrote: > > > > STORM-3217 and STORM-3221 have been fixed - +1 from me for 2.0 RC. > > > > On Wed, Sep 12, 2018 at 10:01 AM Govind Menon wrote: > > > > > Hi all, > > > > > > There are some regressions that I introduced as part of STORM-1311 > which > > > I'm working on as part of > > https://issues.apache.org/jira/browse/STORM-3217 > > > and https://issues.apache.org/jira/browse/STORM-3221. These should be > > > fixed before a 2.x release > > > > > > I have code working on the Yahoo internal branch and should have PRs up > > > for them in community soon. > > > > > > I apologize for slowing things up. > > > > > > Thanks, > > > Govind. > > > > > > On Tue, Sep 11, 2018 at 3:31 PM Arun Mahadevan > wrote: > > > > > >> +1 for releasing 2.0. > > >> > > >> May be the RC can be cut once critical patches are merged. > > >> > > >> On Tue, 11 Sep 2018 at 10:28, Stig Rohde Døssing < > > stigdoess...@gmail.com> > > >> wrote: > > >> > > >> > +1 to cut an RC. > > >> > > > >> > Here are a couple of PRs that could maybe go in > > >> > > > >> > https://github.com/apache/storm/pull/2719 > > >> > https://github.com/apache/storm/pull/2800 (this one requires some > > >> changes, > > >> > but we should be able to fix it pretty quickly) > > >> > also would like to get https://github.com/apache/storm/pull/2805 > > >> reviewed, > > >> > it might change some public methods. > > >> > > > >> > Other than that, we should try to remove as much deprecated code as > we > > >> can > > >> > before release > > >> > > > >> > https://issues.apache.org/jira/browse/STORM-2947 > > >> > > > >> > Den man. 10. sep. 2018 kl. 21.59 skrev Alexandre Vermeerbergen < > > >> > avermeerber...@gmail.com>: > > >> > > > >> > > +1 for an Storm 2.0 as soon as possible, let's jump into the > future > > :) > > >> > > Le lun. 10 sept. 2018 à 21:50, Kishorkumar Patil > > >> > > a écrit : > > >> > > > > > >> > > > Looking into all issues reported under epic > > >> > > > https://issues.apache.org/jira/browse/STORM-2714 are > > >> resolved/closed. > > >> > I > > >> > > > don't see any open issues/blockers at this point for going ahead > > >> with > > >> > 2.x > > >> > > > release. > > >> > > > > > >> > > > I am +1 to 2.0 release. > > >> > > > > > >> > > > Regards, > > >> > > > -Kishor > > >> > > > > > >> > > > On Mon, Sep 10, 2018 at 2:24 PM, P. Taylor Goetz < > > ptgo...@gmail.com > > >> > > > >> > > wrote: > > >> > > > > > >> > > > > I agree, and looking through the JIRAs against 2.0, I would > say > > a > > >> > > majority > > >> > > > > of the ones marked critical are not critical. > > >> > > > > > > >> > > > > I’m +1 on moving forward with a 2.0 release, but will give > > others > > >> > time > > >> > > to > > >> > > > > respond with any JIRAs they think should be included. > > >> > > > > > > >> > > > > > p.s. I don't want to create branch-2.x or branch-2.0.x until > > >> > > absolutely > > >> > > > > > necessary, I don't see any major features with pull requests > > up > > >> but > > >> > > if > > >> > > > > you > > >> > > > >