STORM-3133: Extend metrics on Nimbus and LogViewer: STORM-3157: Added registration method for MetricSet
STORM-3133: Refactored and added metrics to LogViewer components STORM-3133: Fixed up Unit test for LogViewer STORM-3133: Refactored and added metrics to Nimbus components. STORM-3133: Add nimbus scheduling metrics STORM-3133: Add metrics for disk usage of workers' logs and performance of LogCleaner routine STORM-3133: Refactored code and added file partial read count metric for logviewer STORM-3133: Add metrics for counting LogViewer's IOExceptions Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bf81b684 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bf81b684 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bf81b684 Branch: refs/heads/master Commit: bf81b6840dba16b506187c53448db8e09a6a9f14 Parents: c9efe3b Author: Zhengdai Hu <hu.zheng...@gmail.com> Authored: Fri Jul 20 13:41:51 2018 -0500 Committer: Zhengdai Hu <zhengdai...@oath.com> Committed: Fri Aug 10 12:01:36 2018 -0500 ---------------------------------------------------------------------- .../org/apache/storm/scheduler/WorkerSlot.java | 7 + .../org/apache/storm/daemon/nimbus/Nimbus.java | 228 ++++++++++++------- .../daemon/supervisor/SupervisorUtils.java | 9 +- .../storm/localizer/LocallyCachedBlob.java | 3 +- .../storm/metric/StormMetricsRegistry.java | 25 +- .../storm/nimbus/LeaderListenerCallback.java | 7 + .../org/apache/storm/scheduler/Cluster.java | 12 +- .../apache/storm/scheduler/ExecutorDetails.java | 9 +- .../storm/metric/StormMetricsRegistryTest.java | 111 +++++++++ .../storm/daemon/logviewer/LogviewerServer.java | 6 +- .../handler/LogviewerLogPageHandler.java | 102 +++++---- .../handler/LogviewerLogSearchHandler.java | 159 ++++++++----- .../daemon/logviewer/utils/DeletionMeta.java | 31 +++ .../logviewer/utils/DirectoryCleaner.java | 25 +- .../daemon/logviewer/utils/ExceptionMeters.java | 66 ++++++ .../daemon/logviewer/utils/LogCleaner.java | 48 ++-- .../logviewer/utils/LogFileDownloader.java | 8 + .../utils/LogviewerResponseBuilder.java | 19 +- .../daemon/logviewer/utils/WorkerLogs.java | 16 +- .../logviewer/webapp/LogviewerResource.java | 78 ++++++- .../handler/LogviewerLogSearchHandlerTest.java | 3 +- .../daemon/logviewer/utils/LogCleanerTest.java | 9 +- 22 files changed, 722 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java index 07064db..fa963d2 100644 --- a/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java +++ b/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java @@ -12,6 +12,9 @@ package org.apache.storm.scheduler; +import java.util.Arrays; +import java.util.List; + public class WorkerSlot { private final String nodeId; private final int port; @@ -39,6 +42,10 @@ public class WorkerSlot { return getNodeId() + ":" + getPort(); } + public List<Object> toList() { + return Arrays.asList(nodeId, (long) port); + } + @Override public int hashCode() { return nodeId.hashCode() + 13 * ((Integer) port).hashCode(); http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index c401f60..a096217 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -18,9 +18,9 @@ package org.apache.storm.daemon.nimbus; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -45,6 +46,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -53,6 +55,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.security.auth.Subject; + import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.DaemonConfig; @@ -181,6 +184,8 @@ import org.apache.storm.security.auth.workertoken.WorkerTokenManager; import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; import org.apache.storm.shade.com.google.common.base.Strings; import org.apache.storm.shade.com.google.common.collect.ImmutableMap; +import org.apache.storm.shade.com.google.common.collect.MapDifference; +import org.apache.storm.shade.com.google.common.collect.Maps; import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework; import org.apache.storm.shade.org.apache.zookeeper.ZooDefs; import org.apache.storm.shade.org.apache.zookeeper.data.ACL; @@ -251,10 +256,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls"); private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls"); private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls"); - private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms", - new ExponentiallyDecayingReservoir()); private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter( "nimbus:num-getOwnerResourceSummaries-calls"); + //Timer + private static final Timer fileUploadDuration = StormMetricsRegistry.registerTimer("nimbus:files-upload-duration-ms"); + private static final Timer schedulingDuration = StormMetricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms"); + //Scheduler histogram + private static final Histogram numAddedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling"); + private static final Histogram numAddedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling"); + private static final Histogram numRemovedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling"); + private static final Histogram numRemovedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling"); + private static final Histogram numNetExecIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-executors-increase-per-scheduling"); + private static final Histogram numNetSlotIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-slots-increase-per-scheduling"); // END Metrics private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls"); private static final Meter processWorkerMetricsCalls = StormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls"); @@ -411,6 +424,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private final StormTimer timer; private final IScheduler scheduler; private final IScheduler underlyingScheduler; + //Metrics related + private final AtomicReference<Long> schedulingStartTimeNs = new AtomicReference<>(null); + private final AtomicLong longestSchedulingTime = new AtomicLong(); + private final ILeaderElector leaderElector; private final AssignmentDistributionService assignmentsDistributer; private final AtomicReference<Map<String, String>> idToSchedStatus; @@ -550,6 +567,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { }); } + //Not symmetric difference. Performing A.entrySet() - B.entrySet() private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) { Map<K, V> ret = new HashMap<>(); for (Entry<? extends K, ? extends V> entry : second.entrySet()) { @@ -689,26 +707,20 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { * @return {topology-id -> {executor [node port]}} mapping */ private static Map<String, Map<List<Long>, List<Object>>> computeTopoToExecToNodePort( - Map<String, SchedulerAssignment> schedAssignments) { + Map<String, SchedulerAssignment> schedAssignments, List<String> assignedTopologyIds) { Map<String, Map<List<Long>, List<Object>>> ret = new HashMap<>(); for (Entry<String, SchedulerAssignment> schedEntry : schedAssignments.entrySet()) { Map<List<Long>, List<Object>> execToNodePort = new HashMap<>(); for (Entry<ExecutorDetails, WorkerSlot> execAndNodePort : schedEntry.getValue().getExecutorToSlot().entrySet()) { ExecutorDetails exec = execAndNodePort.getKey(); WorkerSlot slot = execAndNodePort.getValue(); - - List<Long> listExec = new ArrayList<>(2); - listExec.add((long) exec.getStartTask()); - listExec.add((long) exec.getEndTask()); - - List<Object> nodePort = new ArrayList<>(2); - nodePort.add(slot.getNodeId()); - nodePort.add((long) slot.getPort()); - - execToNodePort.put(listExec, nodePort); + execToNodePort.put(exec.toList(), slot.toList()); } ret.put(schedEntry.getKey(), execToNodePort); } + for (String id : assignedTopologyIds) { + ret.putIfAbsent(id, null); + } return ret; } @@ -735,39 +747,95 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return ret; } - private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort( - Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) { - Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments); - // Print some useful information - if (existingAssignments != null && !existingAssignments.isEmpty()) { - for (Entry<String, Map<List<Long>, List<Object>>> entry : ret.entrySet()) { - String topoId = entry.getKey(); - Map<List<Long>, List<Object>> execToNodePort = entry.getValue(); - Assignment assignment = existingAssignments.get(topoId); - if (assignment == null) { - continue; + private boolean auditAssignmentChanges(Map<String, Assignment> existingAssignments, + Map<String, Assignment> newAssignments) { + assert existingAssignments != null && newAssignments != null; + boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); + long numRemovedExec = 0; + long numRemovedSlot = 0; + long numAddedExec = 0; + long numAddedSlot = 0; + if (existingAssignments.isEmpty()) { + for (Entry<String, Assignment> entry : newAssignments.entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Assigning {} to {} slots", entry.getKey(), count); + LOG.info("Assign executors: {}", execToPort.keySet()); + numAddedSlot += count; + numAddedExec += execToPort.size(); + } + } else if (newAssignments.isEmpty()) { + for (Entry<String, Assignment> entry : existingAssignments.entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Removing {} from {} slots", entry.getKey(), count); + LOG.info("Remove executors: {}", execToPort.keySet()); + numRemovedSlot += count; + numRemovedExec += execToPort.size(); + } + } else { + MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments); + if (anyChanged = !difference.areEqual()) { + for (Entry<String, Assignment> entry : difference.entriesOnlyOnLeft().entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Removing {} from {} slots", entry.getKey(), count); + LOG.info("Remove executors: {}", execToPort.keySet()); + numRemovedSlot += count; + numRemovedExec += execToPort.size(); } - Map<List<Long>, NodeInfo> old = assignment.get_executor_node_port(); - Map<List<Long>, List<Object>> reassigned = new HashMap<>(); - for (Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) { - NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); - String node = (String) execAndNodePort.getValue().get(0); - Long port = (Long) execAndNodePort.getValue().get(1); - if (oldAssigned == null || !oldAssigned.get_node().equals(node) - || !port.equals(oldAssigned.get_port_iterator().next())) { - reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); - } + for (Entry<String, Assignment> entry : difference.entriesOnlyOnRight().entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Assigning {} to {} slots", entry.getKey(), count); + LOG.info("Assign executors: {}", execToPort.keySet()); + numAddedSlot += count; + numAddedExec += execToPort.size(); } + for (Entry<String, MapDifference.ValueDifference<Assignment>> entry : difference.entriesDiffering().entrySet()) { + final Map<List<Long>, NodeInfo> execToSlot = entry.getValue().rightValue().get_executor_node_port(); + final Set<NodeInfo> slots = new HashSet<>(execToSlot.values()); + LOG.info("Reassigning {} to {} slots", entry.getKey(), slots.size()); + LOG.info("Reassign executors: {}", execToSlot.keySet()); + + final Map<List<Long>, NodeInfo> oldExecToSlot = entry.getValue().leftValue().get_executor_node_port(); + + long commonExecCount = 0; + Set<NodeInfo> commonSlots = new HashSet<>(execToSlot.size()); + for (Entry<List<Long>, NodeInfo> execEntry : execToSlot.entrySet()) { + if (execEntry.getValue().equals(oldExecToSlot.get(execEntry.getKey()))) { + commonExecCount++; + commonSlots.add(execEntry.getValue()); + } + } + long commonSlotCount = commonSlots.size(); - if (!reassigned.isEmpty()) { - int count = (new HashSet<>(execToNodePort.values())).size(); - Set<List<Long>> reExecs = reassigned.keySet(); - LOG.info("Reassigning {} to {} slots", topoId, count); - LOG.info("Reassign executors: {}", reExecs); + //Treat reassign as remove and add + numRemovedSlot += new HashSet<>(oldExecToSlot.values()).size() - commonSlotCount; + numRemovedExec += oldExecToSlot.size() - commonExecCount; + numAddedSlot += slots.size() - commonSlotCount; + numAddedExec += execToSlot.size() - commonExecCount; } } + LOG.debug("{} assignments unchanged: {}", difference.entriesInCommon().size(), difference.entriesInCommon().keySet()); } - return ret; + numAddedExecPerScheduling.update(numAddedExec); + numAddedSlotPerScheduling.update(numAddedSlot); + numRemovedExecPerScheduling.update(numRemovedExec); + numRemovedSlotPerScheduling.update(numRemovedSlot); + numNetExecIncreasePerScheduling.update(numAddedExec - numRemovedExec); + numNetSlotIncreasePerScheduling.update(numAddedSlot - numRemovedSlot); + + if (anyChanged) { + LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); + nodeIdToResources.get().forEach((id, node) -> + LOG.info( + "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " + + "CPU: {}, Available CPU: {}, fragmented: {}", + id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), + node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); + } + return anyChanged; } private static List<List<Long>> changedExecutors(Map<List<Long>, NodeInfo> map, Map<List<Long>, @@ -780,7 +848,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { key.add(ni.get_node()); key.add(ni.get_port_iterator().next()); List<List<Long>> value = new ArrayList<>(entry.getValue()); - value.sort((a, b) -> a.get(0).compareTo(b.get(0))); + value.sort(Comparator.comparing(a -> a.get(0))); slotAssigned.put(key, value); } HashMap<List<Object>, List<List<Long>>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : @@ -788,7 +856,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { HashMap<List<Object>, List<List<Long>>> newSlotAssigned = new HashMap<>(); for (Entry<List<Object>, List<List<Long>>> entry : tmpNewSlotAssigned.entrySet()) { List<List<Long>> value = new ArrayList<>(entry.getValue()); - value.sort((a, b) -> a.get(0).compareTo(b.get(0))); + value.sort(Comparator.comparing(a -> a.get(0))); newSlotAssigned.put(entry.getKey(), value); } Map<List<Object>, List<List<Long>>> diff = mapDiff(slotAssigned, newSlotAssigned); @@ -1217,7 +1285,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { return allNodeHost; } else { // rebalance - Map<String, String> ret = new HashMap(); + Map<String, String> ret = new HashMap<>(); for (Map.Entry<List<Long>, NodeInfo> entry : newExecutorNodePort.entrySet()) { NodeInfo newNodeInfo = entry.getValue(); NodeInfo oldNodeInfo = oldExecutorNodePort.get(entry.getKey()); @@ -1984,11 +2052,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); - long beforeSchedule = System.currentTimeMillis(); + schedulingStartTimeNs.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); - long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; - LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); - scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); + //Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge + final Long startTime = schedulingStartTimeNs.getAndSet(null); + long elapsed = Time.nanoTime() - startTime; + longestSchedulingTime.accumulateAndGet(elapsed, Math::max); + schedulingDuration.update(elapsed, TimeUnit.NANOSECONDS); + LOG.debug("Scheduling took {} ms for {} topologies", elapsed, topologies.getTopologies().size()); //merge with existing statuses idToSchedStatus.set(Utils.merge(idToSchedStatus.get(), cluster.getStatusMap())); @@ -2131,17 +2202,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } // make the new assignments for topologies - Map<String, SchedulerAssignment> newSchedulerAssignments = null; synchronized (schedLock) { - newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); + Map<String, SchedulerAssignment> newSchedulerAssignments = + computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort = - computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments); - for (String id : assignedTopologyIds) { - if (!topologyToExecutorToNodePort.containsKey(id)) { - topologyToExecutorToNodePort.put(id, null); - } - } + computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds); Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments); int nowSecs = Time.currentTimeSecs(); @@ -2154,14 +2220,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { if (execToNodePort == null) { execToNodePort = new HashMap<>(); } - Assignment existingAssignment = existingAssignments.get(topoId); Set<String> allNodes = new HashSet<>(); - if (execToNodePort != null) { - for (List<Object> nodePort : execToNodePort.values()) { - allNodes.add((String) nodePort.get(0)); - } + for (List<Object> nodePort : execToNodePort.values()) { + allNodes.add((String) nodePort.get(0)); } Map<String, String> allNodeHost = new HashMap<>(); + Assignment existingAssignment = existingAssignments.get(topoId); if (existingAssignment != null) { allNodeHost.putAll(existingAssignment.get_node_host()); } @@ -2219,15 +2283,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { newAssignments.put(topoId, newAssignment); } - if (!newAssignments.equals(existingAssignments)) { + boolean assignmentChanged = auditAssignmentChanges(existingAssignments, newAssignments); + if (assignmentChanged) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); - LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); - nodeIdToResources.get().forEach((id, node) -> - LOG.info( - "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " - + "CPU: {}, Available CPU: {}, fragmented: {}", - id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), - node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); idToResources.set(new HashMap<>()); idToWorkerResources.set(new HashMap<>()); } @@ -2826,21 +2884,27 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { + //We want to update longest scheduling time in real time in case scheduler get stuck + // Get current time before startTime to avoid potential race with scheduler's Timer + Long currTime = Time.nanoTime(); + Long startTime = schedulingStartTimeNs.get(); + return TimeUnit.NANOSECONDS.toMillis(startTime == null ? + longestSchedulingTime.get() : Math.max(currTime - startTime, longestSchedulingTime.get())); + }); + StormMetricsRegistry.registerMeter("nimbus:num-launched").mark(); StormMetricsRegistry.startMetricsReporters(conf); - if (clusterConsumerExceutors != null) { - timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), - () -> { - try { - if (isLeader()) { - sendClusterMetricsToExecutors(); - } - } catch (Exception e) { - throw new RuntimeException(e); + timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), + () -> { + try { + if (isLeader()) { + sendClusterMetricsToExecutors(); } - }); - } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) { throw e; @@ -3689,7 +3753,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { beginFileUploadCalls.mark(); checkAuthorization(null, null, "fileUpload"); String fileloc = getInbox() + "/stormjar-" + Utils.uuid() + ".jar"; - uploaders.put(fileloc, Channels.newChannel(new FileOutputStream(fileloc))); + uploaders.put(fileloc, new TimedWritableByteChannel(Channels.newChannel(new FileOutputStream(fileloc)), fileUploadDuration)); LOG.info("Uploading file from client to {}", fileloc); return fileloc; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java index 90d68dc..4619aeb 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -114,20 +114,17 @@ public class SupervisorUtils { * @param conf * @return * - * @throws Exception */ - public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) throws Exception { + public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map<String, Object> conf) { return _instance.readWorkerHeartbeatsImpl(conf); } /** - * get worker heartbeat by workerId + * get worker heartbeat by workerId. * * @param conf * @param workerId * @return - * - * @throws IOException */ private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> conf, String workerId) { return _instance.readWorkerHeartbeatImpl(conf, workerId); @@ -137,7 +134,7 @@ public class SupervisorUtils { return _instance.isWorkerHbTimedOutImpl(now, whb, conf); } - public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) throws Exception { + public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) { Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>(); Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java index 952d8d9..f12713b 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java @@ -52,8 +52,7 @@ public abstract class LocallyCachedBlob { private long lastUsed = Time.currentTimeMillis(); private CompletableFuture<Void> doneUpdating = null; - private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram( - "supervisor:blob-fetching-rate-MB/s", new ExponentiallyDecayingReservoir()); + private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram("supervisor:blob-fetching-rate-MB/s"); /** * Create a new LocallyCachedBlob. http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java index 602f53e..ea8867e 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java +++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java @@ -18,18 +18,21 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricSet; import com.codahale.metrics.Reservoir; import com.codahale.metrics.Timer; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StormMetricsRegistry extends MetricRegistry { - private static final StormMetricsRegistry REGISTRY = new StormMetricsRegistry(); + @VisibleForTesting + static final StormMetricsRegistry REGISTRY = new StormMetricsRegistry(); private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class); private StormMetricsRegistry() {/*Singleton pattern*/} @@ -54,6 +57,25 @@ public class StormMetricsRegistry extends MetricRegistry { REGISTRY.register(name, meter); } + public static void registerMetricSet(MetricSet metrics) { + REGISTRY.registerAll(metrics); + } + + public static void unregisterMetricSet(MetricSet metrics) { + unregisterMetricSet(null, metrics); + } + + public static void unregisterMetricSet(String prefix, MetricSet metrics) { + for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) { + final String name = name(prefix, entry.getKey()); + if (entry.getValue() instanceof MetricSet) { + unregisterMetricSet(name, (MetricSet) entry.getValue()); + } else { + REGISTRY.remove(name); + } + } + } + public static Timer registerTimer(String name) { return REGISTRY.register(name, new Timer()); } @@ -84,6 +106,7 @@ public class StormMetricsRegistry extends MetricRegistry { */ @Override public <T extends Metric> T register(final String name, T metric) throws IllegalArgumentException { + assert !(metric instanceof MetricSet); try { return super.register(name, metric); } catch (IllegalArgumentException e) { http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java index e54509e..3783fdb 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java @@ -19,6 +19,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import javax.security.auth.Subject; + +import com.codahale.metrics.Meter; import org.apache.commons.io.IOUtils; import org.apache.storm.Config; import org.apache.storm.blobstore.BlobStore; @@ -29,6 +31,7 @@ import org.apache.storm.daemon.nimbus.TopoCache; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.StormTopology; +import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.security.auth.ReqContext; import org.apache.storm.shade.com.google.common.base.Joiner; import org.apache.storm.shade.com.google.common.collect.Sets; @@ -45,6 +48,8 @@ import org.slf4j.LoggerFactory; * A callback function when nimbus gains leadership. */ public class LeaderListenerCallback { + private static final Meter numGainedLeader = StormMetricsRegistry.registerMeter("nimbus:num-gained-leadership"); + private static final Meter numLostLeader = StormMetricsRegistry.registerMeter("nimbus:num-lost-leadership"); private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class); private static final String STORM_JAR_SUFFIX = "-stormjar.jar"; private static final String STORM_CODE_SUFFIX = "-stormcode.ser"; @@ -82,6 +87,7 @@ public class LeaderListenerCallback { * Invoke when gains leadership. */ public void leaderCallBack() { + numGainedLeader.mark(); //set up nimbus-info to zk setUpNimbusInfo(acls); //sync zk assignments/id-info to local @@ -131,6 +137,7 @@ public class LeaderListenerCallback { * Invoke when lost leadership. */ public void notLeaderCallback() { + numLostLeader.mark(); tc.clear(); } http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java index 3f48669..d014236 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java @@ -145,11 +145,7 @@ public class Cluster implements ISchedulingState { String nodeId = entry.getKey(); SupervisorDetails supervisor = entry.getValue(); String host = supervisor.getHost(); - List<String> ids = hostToId.get(host); - if (ids == null) { - ids = new ArrayList<>(); - hostToId.put(host, ids); - } + List<String> ids = hostToId.computeIfAbsent(host, k -> new ArrayList<>()); ids.add(nodeId); } this.conf = conf; @@ -173,11 +169,7 @@ public class Cluster implements ISchedulingState { for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) { String hostName = entry.getKey(); String rack = entry.getValue(); - List<String> nodesForRack = this.networkTopography.get(rack); - if (nodesForRack == null) { - nodesForRack = new ArrayList<>(); - this.networkTopography.put(rack, nodesForRack); - } + List<String> nodesForRack = this.networkTopography.computeIfAbsent(rack, k -> new ArrayList<>()); nodesForRack.add(hostName); } } else { http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java index 855cc96..18de717 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java @@ -18,6 +18,9 @@ package org.apache.storm.scheduler; +import java.util.Arrays; +import java.util.List; + public class ExecutorDetails { public final int startTask; public final int endTask; @@ -35,9 +38,13 @@ public class ExecutorDetails { return endTask; } + public List<Long> toList() { + return Arrays.asList((long) startTask, (long) endTask); + } + @Override public boolean equals(Object other) { - if (other == null || !(other instanceof ExecutorDetails)) { + if (!(other instanceof ExecutorDetails)) { return false; } http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java new file mode 100644 index 0000000..5d9b3e4 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metric; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.codahale.metrics.Timer; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static com.codahale.metrics.MetricRegistry.name; +import static org.junit.jupiter.api.Assertions.*; + +class StormMetricsRegistryTest { + private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistryTest.class); + + private static final String OUTER_METER = "outerMeter"; + private static final String INNER_SET = "innerSet"; + private static final String OUTER_TIMER = "outerTimer"; + private static final String INNER_METER = "innerMeter"; + private static final String INNER_TIMER = "innerTimer"; + private static final MetricSet OUTER = newMetricSetInstance(); + + @Test + void registerMetricSet() { + Meter existingInnerMeter = StormMetricsRegistry.registerMeter(name(INNER_SET, INNER_METER)); + + LOG.info("register outer set"); + StormMetricsRegistry.registerMetricSet(OUTER); + assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER)); + assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER)); + assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER), + StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER))); + + assertNotSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_METER), + StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER))); + assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER))); + + //Ensure idempotency + LOG.info("twice register outer set"); + MetricSet newOuter = newMetricSetInstance(); + StormMetricsRegistry.registerMetricSet(newOuter); + assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER)); + assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER)); + assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER), + StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER))); + assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER))); + + LOG.info("name collision"); + assertThrows(IllegalArgumentException.class, () -> StormMetricsRegistry.registerGauge(name(INNER_SET, INNER_METER), () -> 0)); + } + + @Test + void unregisterMetricSet() { + StormMetricsRegistry.registerMetricSet(OUTER); + StormMetricsRegistry.unregisterMetricSet(OUTER); + assertTrue(StormMetricsRegistry.REGISTRY.getMetrics().isEmpty()); + + } + + private static MetricSet newMetricSetInstance() { + return new MetricSet() { + private final MetricSet inner = new MetricSet() { + private final Map<String, Metric> map = new HashMap<>(); + + { + map.put(INNER_METER, new Meter()); + map.put(INNER_TIMER, new Timer()); + } + + @Override + public Map<String, Metric> getMetrics() { + return map; + } + }; + private final Map<String, Metric> outerMap = new HashMap<>(); + + { + outerMap.put(OUTER_METER, new Meter()); + outerMap.put(INNER_SET, inner); + outerMap.put(OUTER_TIMER, new Timer()); + } + + @Override + public Map<String, Metric> getMetrics() { + return outerMap; + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java index 07ac14b..07b971c 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java @@ -21,6 +21,8 @@ package org.apache.storm.daemon.logviewer; import static org.apache.storm.DaemonConfig.UI_HEADER_BUFFER_BYTES; import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; import com.google.common.annotations.VisibleForTesting; import java.io.File; @@ -31,6 +33,7 @@ import java.util.Map; import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; +import org.apache.storm.daemon.logviewer.utils.ExceptionMeters; import org.apache.storm.daemon.logviewer.utils.LogCleaner; import org.apache.storm.daemon.logviewer.utils.WorkerLogs; import org.apache.storm.daemon.logviewer.webapp.LogviewerApplication; @@ -126,6 +129,7 @@ public class LogviewerServer implements AutoCloseable { void start() throws Exception { LOG.info("Starting Logviewer..."); if (httpServer != null) { + StormMetricsRegistry.registerMetricSet(ExceptionMeters::getMetrics); httpServer.start(); } } @@ -165,7 +169,7 @@ public class LogviewerServer implements AutoCloseable { try (LogviewerServer server = new LogviewerServer(conf); LogCleaner logCleaner = new LogCleaner(conf, workerLogs, directoryCleaner, logRootDir)) { - Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close()); + Utils.addShutdownHookWithForceKillIn1Sec(server::close); logCleaner.start(); StormMetricsRegistry.startMetricsReporters(conf); server.start(); http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java index 32e79eb..089d965 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java @@ -37,11 +37,13 @@ import static java.util.stream.Collectors.toCollection; import static java.util.stream.Collectors.toList; import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; +import com.codahale.metrics.Meter; import j2html.tags.DomContent; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; @@ -61,6 +63,7 @@ import javax.ws.rs.core.Response; import org.apache.commons.lang.StringUtils; import org.apache.storm.daemon.logviewer.LogviewerConstant; import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; +import org.apache.storm.daemon.logviewer.utils.ExceptionMeters; import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder; import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; import org.apache.storm.daemon.logviewer.utils.WorkerLogs; @@ -68,11 +71,13 @@ import org.apache.storm.daemon.ui.InvalidRequestException; import org.apache.storm.daemon.ui.UIHelpers; import org.apache.storm.daemon.utils.StreamUtil; import org.apache.storm.daemon.utils.UrlBuilder; +import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerUtils; import org.jooq.lambda.Unchecked; public class LogviewerLogPageHandler { + private static final Meter numPageRead = StormMetricsRegistry.registerMeter("logviewer:num-page-read"); private final String logRoot; private final String daemonLogRoot; private final WorkerLogs workerLogs; @@ -152,7 +157,7 @@ public class LogviewerLogPageHandler { List<String> files; if (fileResults != null) { files = fileResults.stream() - .map(file -> WorkerLogs.getTopologyPortWorkerLog(file)) + .map(WorkerLogs::getTopologyPortWorkerLog) .sorted().collect(toList()); } else { files = new ArrayList<>(); @@ -162,11 +167,12 @@ public class LogviewerLogPageHandler { } /** - * Provides a worker log file to view. + * Provides a worker log file to view, starting from the specified position + * or default starting position of the most recent page. * * @param fileName file to view - * @param start start offset, can be null - * @param length length to read in this page, can be null + * @param start start offset, or null if the most recent page is desired + * @param length length to read in this page, or null if default page length is desired * @param grep search string if request is a result of the search, can be null * @param user username * @return HTML view page of worker log @@ -179,7 +185,6 @@ public class LogviewerLogPageHandler { File file = new File(rootDir, fileName).getCanonicalFile(); String path = file.getCanonicalPath(); - boolean isZipFile = path.endsWith(".gz"); File topoDir = file.getParentFile().getParentFile(); if (file.exists() && new File(rootDir).getCanonicalFile().equals(topoDir.getParentFile())) { @@ -193,24 +198,21 @@ public class LogviewerLogPageHandler { throw e.getCause(); } - List<String> filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) - .filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); - - List<String> reorderedFilesStr = new ArrayList<>(); - reorderedFilesStr.addAll(filesStrWithoutFileParam); + List<String> reorderedFilesStr = logFiles.stream() + .map(WorkerLogs::getTopologyPortWorkerLog) + .filter(fileStr -> !StringUtils.equals(fileName, fileStr)) + .collect(toList()); reorderedFilesStr.add(fileName); length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - - String logString; - if (isTxtFile(fileName)) { - logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); - } else { - logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); + final boolean isZipFile = path.endsWith(".gz"); + long fileLength = getFileLength(file, isZipFile); + if (start == null) { + start = Long.valueOf(fileLength - length).intValue(); } - long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); - start = start != null ? start : Long.valueOf(fileLength - length).intValue(); + String logString = isTxtFile(fileName) ? escapeHtml(pageFile(path, isZipFile, fileLength, start, length)) : + escapeHtml("This is a binary file and cannot display! You may download the full file."); List<DomContent> bodyContents = new ArrayList<>(); if (StringUtils.isNotEmpty(grep)) { @@ -254,8 +256,8 @@ public class LogviewerLogPageHandler { * Provides a daemon log file to view. * * @param fileName file to view - * @param start start offset, can be null - * @param length length to read in this page, can be null + * @param start start offset, or null if the most recent page is desired + * @param length length to read in this page, or null if default page length is desired * @param grep search string if request is a result of the search, can be null * @param user username * @return HTML view page of daemon log @@ -265,7 +267,6 @@ public class LogviewerLogPageHandler { String rootDir = daemonLogRoot; File file = new File(rootDir, fileName).getCanonicalFile(); String path = file.getCanonicalPath(); - boolean isZipFile = path.endsWith(".gz"); if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) { // all types of files included @@ -273,24 +274,21 @@ public class LogviewerLogPageHandler { .filter(File::isFile) .collect(toList()); - List<String> filesStrWithoutFileParam = logFiles.stream() - .map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); - - List<String> reorderedFilesStr = new ArrayList<>(); - reorderedFilesStr.addAll(filesStrWithoutFileParam); + List<String> reorderedFilesStr = logFiles.stream() + .map(File::getName) + .filter(fName -> !StringUtils.equals(fileName, fName)) + .collect(toList()); reorderedFilesStr.add(fileName); length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - - String logString; - if (isTxtFile(fileName)) { - logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); - } else { - logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); + final boolean isZipFile = path.endsWith(".gz"); + long fileLength = getFileLength(file, isZipFile); + if (start == null) { + start = Long.valueOf(fileLength - length).intValue(); } - long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); - start = start != null ? start : Long.valueOf(fileLength - length).intValue(); + String logString = isTxtFile(fileName) ? escapeHtml(pageFile(path, isZipFile, fileLength, start, length)) : + escapeHtml("This is a binary file and cannot display! You may download the full file."); List<DomContent> bodyContents = new ArrayList<>(); if (StringUtils.isNotEmpty(grep)) { @@ -323,6 +321,18 @@ public class LogviewerLogPageHandler { } } + private long getFileLength(File file, boolean isZipFile) throws IOException { + try { + return isZipFile ? ServerUtils.zipFileSize(file) : file.length(); + } catch (FileNotFoundException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); + throw e; + } catch (IOException e) { + ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark(); + throw e; + } + } + private DomContent logTemplate(List<DomContent> bodyContents, String fileName, String user) { List<DomContent> finalBodyContents = new ArrayList<>(); @@ -426,17 +436,8 @@ public class LogviewerLogPageHandler { return a(text).withHref(url).withClass("btn btn-default " + (enabled ? "enabled" : "disabled")); } - private String pageFile(String path, Integer tail) throws IOException, InvalidRequestException { - boolean isZipFile = path.endsWith(".gz"); - long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); - long skip = fileLength - tail; - return pageFile(path, Long.valueOf(skip).intValue(), tail); - } - - private String pageFile(String path, Integer start, Integer length) throws IOException, InvalidRequestException { - boolean isZipFile = path.endsWith(".gz"); - long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); - + private String pageFile(String path, boolean isZipFile, long fileLength, Integer start, Integer readLength) + throws IOException, InvalidRequestException { try (InputStream input = isZipFile ? new GZIPInputStream(new FileInputStream(path)) : new FileInputStream(path); ByteArrayOutputStream output = new ByteArrayOutputStream()) { if (start >= fileLength) { @@ -447,8 +448,8 @@ public class LogviewerLogPageHandler { } byte[] buffer = new byte[1024]; - while (output.size() < length) { - int size = input.read(buffer, 0, Math.min(1024, length - output.size())); + while (output.size() < readLength) { + int size = input.read(buffer, 0, Math.min(1024, readLength - output.size())); if (size > 0) { output.write(buffer, 0, size); } else { @@ -456,7 +457,14 @@ public class LogviewerLogPageHandler { } } + numPageRead.mark(); return output.toString(); + } catch (FileNotFoundException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); + throw e; + } catch (IOException e) { + ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark(); + throw e; } } http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java index a26396c..bcde077 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java @@ -25,6 +25,10 @@ import static org.apache.storm.daemon.utils.ListFunctionalSupport.last; import static org.apache.storm.daemon.utils.ListFunctionalSupport.rest; import static org.apache.storm.daemon.utils.PathUtil.truncatePathToLastElements; +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -46,6 +50,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Stream; import java.util.zip.GZIPInputStream; import javax.ws.rs.core.Response; @@ -56,12 +61,14 @@ import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.common.JsonResponseBuilder; import org.apache.storm.daemon.logviewer.LogviewerConstant; import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; +import org.apache.storm.daemon.logviewer.utils.ExceptionMeters; import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder; import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; import org.apache.storm.daemon.logviewer.utils.WorkerLogs; import org.apache.storm.daemon.ui.InvalidRequestException; import org.apache.storm.daemon.utils.StreamUtil; import org.apache.storm.daemon.utils.UrlBuilder; +import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; @@ -71,6 +78,9 @@ import org.slf4j.LoggerFactory; public class LogviewerLogSearchHandler { private static final Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class); + private static final Meter numDeepSearchNoResult = StormMetricsRegistry.registerMeter("logviewer:num-deep-search-no-result"); + private static final Histogram numFileScanned = StormMetricsRegistry.registerHistogram("logviewer:num-files-scanned-per-deep-search"); + private static final Meter numSearchRequestNoResult = StormMetricsRegistry.registerMeter("logviewer:num-search-request-no-result"); public static final int GREP_MAX_SEARCH_SIZE = 1024; public static final int GREP_BUF_SIZE = 2048; @@ -124,6 +134,8 @@ public class LogviewerLogSearchHandler { public Response searchLogFile(String fileName, String user, boolean isDaemon, String search, String numMatchesStr, String offsetStr, String callback, String origin) throws IOException, InvalidRequestException { + boolean noResult = true; + String rootDir = isDaemon ? daemonLogRoot : logRoot; File file = new File(rootDir, fileName).getCanonicalFile(); Response response; @@ -136,7 +148,9 @@ public class LogviewerLogSearchHandler { if (StringUtils.isNotEmpty(search) && search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) { Map<String, Object> entity = new HashMap<>(); entity.put("isDaemon", isDaemon ? "yes" : "no"); - entity.putAll(substringSearch(file, search, isDaemon, numMatchesInt, offsetInt)); + Map<String, Object> res = substringSearch(file, search, isDaemon, numMatchesInt, offsetInt); + entity.putAll(res); + noResult = ((List) res.get("matches")).isEmpty(); response = LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin); } else { @@ -159,16 +173,20 @@ public class LogviewerLogSearchHandler { response = new JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build(); } + if (noResult) { + numSearchRequestNoResult.mark(); + } return response; } /** - * Deep search across worker log files in a topology. + * Advanced search across worker log files in a topology. * * @param topologyId topology ID * @param user username * @param search search string - * @param numMatchesStr the count of maximum matches + * @param numMatchesStr the count of maximum matches. Note that this number is with respect to + * each port, not to each log or each search request * @param portStr worker port, null or '*' if the request wants to search from all worker logs * @param fileOffsetStr index (offset) of the log files * @param offsetStr start offset for log file @@ -180,6 +198,9 @@ public class LogviewerLogSearchHandler { public Response deepSearchLogsForTopology(String topologyId, String user, String search, String numMatchesStr, String portStr, String fileOffsetStr, String offsetStr, Boolean searchArchived, String callback, String origin) { + int numMatchedFiles = 0; + int numScannedFiles = 0; + String rootDir = logRoot; Object returnValue; File topologyDir = new File(rootDir, topologyId); @@ -200,24 +221,24 @@ public class LogviewerLogSearchHandler { if (StringUtils.isEmpty(portStr) || portStr.equals("*")) { // check for all ports - List<List<File>> filteredLogs = portDirs.stream() - .map(portDir -> logsForPort(user, portDir)) - .filter(logs -> logs != null && !logs.isEmpty()) - .collect(toList()); + Stream<List<File>> portsOfLogs = portDirs.stream() + .map(portDir -> logsForPort(user, portDir)) + .filter(logs -> logs != null && !logs.isEmpty()); - if (BooleanUtils.isTrue(searchArchived)) { - returnValue = filteredLogs.stream() - .map(fl -> findNMatches(fl, numMatches, 0, 0, search)) - .collect(toList()); - } else { - returnValue = filteredLogs.stream() - .map(fl -> Collections.singletonList(first(fl))) - .map(fl -> findNMatches(fl, numMatches, 0, 0, search)) - .collect(toList()); + if (BooleanUtils.isNotTrue(searchArchived)) { + portsOfLogs = portsOfLogs.map(fl -> Collections.singletonList(first(fl))); } + + final List<Matched> matchedList = portsOfLogs + .map(logs -> findNMatches(logs, numMatches, 0, 0, search)) + .collect(toList()); + numMatchedFiles = matchedList.stream().mapToInt(match -> match.getMatches().size()).sum(); + numScannedFiles = matchedList.stream().mapToInt(match -> match.openedFiles).sum(); + returnValue = matchedList; } else { int port = Integer.parseInt(portStr); // check just the one port + @SuppressWarnings("unchecked") List<Integer> slotsPorts = (List<Integer>) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS, new ArrayList<>()); boolean containsPort = slotsPorts.stream() @@ -232,17 +253,22 @@ public class LogviewerLogSearchHandler { returnValue = new ArrayList<>(); } else { List<File> filteredLogs = logsForPort(user, portDir); - if (BooleanUtils.isTrue(searchArchived)) { - returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search); - } else { - returnValue = findNMatches(Collections.singletonList(first(filteredLogs)), - numMatches, 0, offset, search); + if (BooleanUtils.isNotTrue(searchArchived)) { + filteredLogs = Collections.singletonList(first(filteredLogs)); + fileOffset = 0; } + returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search); + numMatchedFiles = ((Matched) returnValue).getMatches().size(); + numScannedFiles = ((Matched) returnValue).openedFiles; } } } } + if (numMatchedFiles == 0) { + numDeepSearchNoResult.mark(); + } + numFileScanned.update(numScannedFiles); return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, callback, origin); } @@ -271,26 +297,21 @@ public class LogviewerLogSearchHandler { private Map<String,Object> substringSearch(File file, String searchString, boolean isDaemon, Integer numMatches, Integer startByteOffset) throws InvalidRequestException { - try { - if (StringUtils.isEmpty(searchString)) { - throw new IllegalArgumentException("Precondition fails: search string should not be empty."); - } - if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) { - throw new IllegalArgumentException("Precondition fails: the length of search string should be less than " - + GREP_MAX_SEARCH_SIZE); - } + if (StringUtils.isEmpty(searchString)) { + throw new IllegalArgumentException("Precondition fails: search string should not be empty."); + } + if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) { + throw new IllegalArgumentException("Precondition fails: the length of search string should be less than " + + GREP_MAX_SEARCH_SIZE); + } - boolean isZipFile = file.getName().endsWith(".gz"); - try (InputStream fis = Files.newInputStream(file.toPath()); - InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis; - BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) { + boolean isZipFile = file.getName().endsWith(".gz"); + try (InputStream fis = Files.newInputStream(file.toPath())) { + try (InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis; + BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) { - int fileLength; - if (isZipFile) { - fileLength = (int) ServerUtils.zipFileSize(file); - } else { - fileLength = (int) file.length(); - } + //It's more likely to be a file read exception here, so we don't differentiate + int fileLength = isZipFile ? (int) ServerUtils.zipFileSize(file) : (int) file.length(); ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE); final byte[] bufArray = buf.array(); @@ -311,7 +332,7 @@ public class LogviewerLogSearchHandler { Arrays.fill(bufArray, (byte) 0); int totalBytesRead = 0; - int bytesRead = stream.read(bufArray, 0, Math.min((int) fileLength, GREP_BUF_SIZE)); + int bytesRead = stream.read(bufArray, 0, Math.min(fileLength, GREP_BUF_SIZE)); buf.limit(bytesRead); totalBytesRead += bytesRead; @@ -335,7 +356,7 @@ public class LogviewerLogSearchHandler { // buffer on the previous read. final int newBufOffset = Math.min(buf.limit(), GREP_MAX_SEARCH_SIZE) - searchBytes.length; - totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, file, fileLength); + totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, fileLength); if (totalBytesRead < 0) { throw new InvalidRequestException("Cannot search past the end of the file"); } @@ -358,8 +379,14 @@ public class LogviewerLogSearchHandler { } } return ret; + } catch (UnknownHostException | UnsupportedEncodingException e) { + throw new RuntimeException(e); + } catch (IOException e) { + ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark(); + throw new RuntimeException(e); } } catch (IOException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); throw new RuntimeException(e); } } @@ -388,32 +415,46 @@ public class LogviewerLogSearchHandler { } } + /** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset number of log files to skip initially + * @param startByteOffset number of byte to be ignored in each log file + * @param targetStr searched string + * @return all matched results + */ @VisibleForTesting - Matched findNMatches(List<File> logs, int numMatches, int fileOffset, int offset, String search) { + Matched findNMatches(List<File> logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) { logs = drop(logs, fileOffset); + LOG.debug("{} files to scan", logs.size()); List<Map<String, Object>> matches = new ArrayList<>(); int matchCount = 0; + int scannedFiles = 0; while (true) { if (logs.isEmpty()) { + //fileOffset = one past last scanned file break; } File firstLog = logs.get(0); - Map<String, Object> theseMatches; + Map<String, Object> matchInLog; try { LOG.debug("Looking through {}", firstLog); - theseMatches = substringSearch(firstLog, search, numMatches - matchCount, offset); + matchInLog = substringSearch(firstLog, targetStr, numMatches - matchCount, startByteOffset); + scannedFiles++; } catch (InvalidRequestException e) { LOG.error("Can't search past end of file.", e); - theseMatches = new HashMap<>(); + matchInLog = new HashMap<>(); } String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog); + //This section simply put the formatted log filename and corresponding port in the matching. final List<Map<String, Object>> newMatches = new ArrayList<>(matches); - Map<String, Object> currentFileMatch = new HashMap<>(theseMatches); + Map<String, Object> currentFileMatch = new HashMap<>(matchInLog); currentFileMatch.put("fileName", fileName); Path firstLogAbsPath; try { @@ -424,27 +465,27 @@ public class LogviewerLogSearchHandler { currentFileMatch.put("port", truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString()); newMatches.add(currentFileMatch); - int newCount = matchCount + ((List<?>)theseMatches.get("matches")).size(); - - //theseMatches is never empty! As guaranteed by the #get().size() method above + int newCount = matchCount + ((List<?>)matchInLog.get("matches")).size(); if (newCount == matchCount) { // matches and matchCount is not changed logs = rest(logs); - offset = 0; + startByteOffset = 0; fileOffset = fileOffset + 1; } else if (newCount >= numMatches) { matches = newMatches; + //fileOffset = the index of last scanned file break; } else { matches = newMatches; logs = rest(logs); - offset = 0; + startByteOffset = 0; fileOffset = fileOffset + 1; matchCount = newCount; } } - return new Matched(fileOffset, search, matches); + LOG.debug("scanned {} files", scannedFiles); + return new Matched(fileOffset, targetStr, matches, scannedFiles); } @@ -502,8 +543,7 @@ public class LogviewerLogSearchHandler { return new SubstringSearchResult(matches, newByteOffset, newBeforeBytes); } - private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, File file, - int fileLength) throws IOException { + private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, int fileLength) throws IOException { byte[] bufArray = buf.array(); // Copy the 2nd half of the buffer to the first half. @@ -513,7 +553,7 @@ public class LogviewerLogSearchHandler { Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0); // Fill the 2nd half with new bytes from the stream. - int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min((int) fileLength, GREP_MAX_SEARCH_SIZE)); + int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min(fileLength, GREP_MAX_SEARCH_SIZE)); buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead); return totalBytesRead + bytesRead; } @@ -693,18 +733,21 @@ public class LogviewerLogSearchHandler { private int fileOffset; private String searchString; private List<Map<String, Object>> matches; + @JsonIgnore + private final int openedFiles; /** * Constructor. - * - * @param fileOffset offset (index) of the files + * @param fileOffset offset (index) of the files * @param searchString search string * @param matches map representing matched search result + * @param openedFiles number of files scanned, used for metrics only */ - public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches) { + public Matched(int fileOffset, String searchString, List<Map<String, Object>> matches, int openedFiles) { this.fileOffset = fileOffset; this.searchString = searchString; this.matches = matches; + this.openedFiles = openedFiles; } public int getFileOffset() { http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java new file mode 100644 index 0000000..9e0afd9 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.utils; + +class DeletionMeta { + static final DeletionMeta EMPTY = new DeletionMeta(0, 0); + + final long deletedSize; + final int deletedFiles; + + DeletionMeta(long deletedSize, int deletedFiles) { + this.deletedSize = deletedSize; + this.deletedFiles = deletedFiles; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java index 310bc8e..293b2be 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java @@ -60,7 +60,12 @@ public class DirectoryCleaner { * @return DirectoryStream */ public DirectoryStream<Path> getStreamForDirectory(File dir) throws IOException { - return Files.newDirectoryStream(dir.toPath()); + try { + return Files.newDirectoryStream(dir.toPath()); + } catch (IOException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); + throw e; + } } /** @@ -74,11 +79,9 @@ public class DirectoryCleaner { * @param activeDirs only for global deletion, we want to skip the active logs in activeDirs * @return number of files deleted */ - public int deleteOldestWhileTooLarge(List<File> dirs, - long quota, boolean forPerDir, Set<String> activeDirs) throws IOException { + public DeletionMeta deleteOldestWhileTooLarge(List<File> dirs, + long quota, boolean forPerDir, Set<String> activeDirs) throws IOException { long totalSize = 0; - int deletedFiles = 0; - for (File dir : dirs) { try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) { for (Path path : stream) { @@ -87,13 +90,14 @@ public class DirectoryCleaner { } } } - LOG.debug("totalSize: {} quota: {}", totalSize, quota); long toDeleteSize = totalSize - quota; if (toDeleteSize <= 0) { - return deletedFiles; + return DeletionMeta.EMPTY; } + int deletedFiles = 0; + long deletedSize = 0; // the oldest pq_size files in this directory will be placed in PQ, with the newest at the root PriorityQueue<File> pq = new PriorityQueue<>(PQ_SIZE, (f1, f2) -> f1.lastModified() > f2.lastModified() ? -1 : 1); int round = 0; @@ -134,6 +138,7 @@ public class DirectoryCleaner { Utils.forceDelete(file.getPath()); LOG.info("Delete file: {}, size: {}, lastModified: {}", canonicalPath, fileSize, lastModified); toDeleteSize -= fileSize; + deletedSize += fileSize; deletedFiles++; } catch (IOException e) { excluded.add(file); @@ -157,7 +162,7 @@ public class DirectoryCleaner { forPerDir ? "this directory" : "root directory", toDeleteSize * 1e-6); } } - return deletedFiles; + return new DeletionMeta(deletedSize, deletedFiles); } private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set<String> activeDirs, File dir, File file) throws IOException { @@ -186,7 +191,11 @@ public class DirectoryCleaner { break; } } + } catch (IOException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); + throw e; } return files; } + } http://git-wip-us.apache.org/repos/asf/storm/blob/bf81b684/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java new file mode 100644 index 0000000..81aa222 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.utils; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; + +import java.util.HashMap; +import java.util.Map; + +public enum ExceptionMeters { + //Operation level IO Exceptions + NUM_FILE_OPEN_EXCEPTIONS("logviewer:num-file-open-exceptions"), + NUM_FILE_READ_EXCEPTIONS("logviewer:num-file-read-exceptions"), + NUM_FILE_REMOVAL_EXCEPTIONS("logviewer:num-file-removal-exceptions"), + NUM_FILE_DOWNLOAD_EXCEPTIONS("logviewer:num-file-download-exceptions"), + NUM_SET_PERMISSION_EXCEPTIONS("logviewer:num-set-permission-exceptions"), + + //Routine level + NUM_CLEANUP_EXCEPTIONS("logviewer:num-other-cleanup-exceptions"), + NUM_READ_LOG_EXCEPTIONS("logviewer:num-read-log-exceptions"), + NUM_READ_DAEMON_LOG_EXCEPTIONS("logviewer:num-read-daemon-log-exceptions"), + NUM_LIST_LOG_EXCEPTIONS("logviewer:num-search-log-exceptions"), + NUM_LIST_DUMP_EXCEPTIONS("logviewer:num-list-dump-files-exceptions"), + NUM_DOWNLOAD_DUMP_EXCEPTIONS("logviewer:num-download-dump-exceptions"), + NUM_DOWNLOAD_LOG_EXCEPTIONS("logviewer:num-download-log-exceptions"), + NUM_DOWNLOAD_DAEMON_LOG_EXCEPTIONS("logviewer:num-download-daemon-log-exceptions"), + NUM_SEARCH_EXCEPTIONS("logviewer:num-search-exceptions"); + + private static final Map<String, Metric> metrics = new HashMap<>(); + + static { + for (ExceptionMeters e : ExceptionMeters.values()) { + metrics.put(e.name, e.meter); + } + } + + private final String name; + private final Meter meter; + + public static Map<String, Metric> getMetrics() { + return metrics; + } + + ExceptionMeters(String name) { + this.name = name; + meter = new Meter(); + } + + public void mark() { + this.meter.mark(); + } +}