[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2504 ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162768150 --- Diff: pom.xml --- @@ -324,6 +324,7 @@ 0.9.12 2.3.5 2.3.0 +5.8.6 --- End diff -- Actually I don't have experience with RocksDB. If you see RocksDB 5.8.6 is running properly, let's just use the version. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162768100 --- Diff: storm-server/pom.xml --- @@ -64,6 +64,10 @@ auto-service true + +org.rocksdb +rocksdbjni --- End diff -- Good. We are good to go then. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162768064 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java --- @@ -0,0 +1,639 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.FilterOptions; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.apache.storm.metricstore.MetricStore; +import org.apache.storm.utils.ObjectReader; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.IndexType; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RocksDbStore implements MetricStore, AutoCloseable { +private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class); +private static final int MAX_QUEUE_CAPACITY = 4000; +static final int INVALID_METADATA_STRING_ID = 0; +RocksDB db; +private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null; +private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY); +private RocksDbMetricsWriter metricsWriter = null; +private MetricsCleaner metricsCleaner = null; +private Meter failureMeter = null; + +interface RocksDbScanCallback { +boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan +} + +/** + * Create metric store instance using the configurations provided via the config map. + * + * @param config Storm config map + * @throws MetricException on preparation error + */ +public void prepare(Map config) throws MetricException { +validateConfig(config); + +this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures"); + +RocksDB.loadLibrary(); +boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false); + +try (Options options = new Options().setCreateIfMissing(createIfMissing)) { +// use the hash index for prefix searches +BlockBasedTableConfig tfc = new BlockBasedTableConfig(); +tfc.setIndexType(IndexType.kHashSearch); +options.setTableFormatConfig(tfc); +options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE); + +String path = getRocksDbAbsoluteDir(config); +LOG.info("Opening RocksDB from {}", path); +db = RocksDB.open(options, path); +} catch (RocksDBException e) { +String message = "Error opening RockDB database"; +LOG.error(message, e); +throw new MetricException(message, e); +} + +// create thread to delete old metrics and metadata +Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString()); +Integer deletionPeriod = 0; +if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) { +deletionPeriod =
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162668430 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java --- @@ -0,0 +1,639 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.FilterOptions; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.apache.storm.metricstore.MetricStore; +import org.apache.storm.utils.ObjectReader; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.IndexType; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RocksDbStore implements MetricStore, AutoCloseable { +private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class); +private static final int MAX_QUEUE_CAPACITY = 4000; +static final int INVALID_METADATA_STRING_ID = 0; +RocksDB db; +private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null; +private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY); +private RocksDbMetricsWriter metricsWriter = null; +private MetricsCleaner metricsCleaner = null; +private Meter failureMeter = null; + +interface RocksDbScanCallback { +boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan +} + +/** + * Create metric store instance using the configurations provided via the config map. + * + * @param config Storm config map + * @throws MetricException on preparation error + */ +public void prepare(Map config) throws MetricException { +validateConfig(config); + +this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures"); + +RocksDB.loadLibrary(); +boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false); + +try (Options options = new Options().setCreateIfMissing(createIfMissing)) { +// use the hash index for prefix searches +BlockBasedTableConfig tfc = new BlockBasedTableConfig(); +tfc.setIndexType(IndexType.kHashSearch); +options.setTableFormatConfig(tfc); +options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE); + +String path = getRocksDbAbsoluteDir(config); +LOG.info("Opening RocksDB from {}", path); +db = RocksDB.open(options, path); +} catch (RocksDBException e) { +String message = "Error opening RockDB database"; +LOG.error(message, e); +throw new MetricException(message, e); +} + +// create thread to delete old metrics and metadata +Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString()); +Integer deletionPeriod = 0; +if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) { +deletionPeriod =
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162634637 --- Diff: storm-server/pom.xml --- @@ -64,6 +64,10 @@ auto-service true + +org.rocksdb +rocksdbjni --- End diff -- I tested on a mac and a RHEL vm. The rocksDB jar also contains a win64 jni dll. If an error is thrown creating the metrics store, everything should be treated as a noop. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162630427 --- Diff: pom.xml --- @@ -324,6 +324,7 @@ 0.9.12 2.3.5 2.3.0 +5.8.6 --- End diff -- No. It was the latest version at the time. Open to advice/suggestions. 5.8.7 and 5.9.2 now also exist. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162323436 --- Diff: storm-server/pom.xml --- @@ -64,6 +64,10 @@ auto-service true + +org.rocksdb +rocksdbjni --- End diff -- Just curious: I don't have experience with rocksdb and since it leverages JNI, I would like to be clear about how it works with multiple OS. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162320163 --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java --- @@ -35,6 +35,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + --- End diff -- nit: unnecessary two empty lines ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162543795 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java --- @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.metricstore.FilterOptions; +import org.apache.storm.metricstore.MetricException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class for removing expired metrics and unused metadata from the RocksDB store. + */ +public class MetricsCleaner implements Runnable, AutoCloseable { +private static final Logger LOG = LoggerFactory.getLogger(MetricsCleaner.class); +private static long DEFAULT_SLEEP_MS = 4L * 60L * 60L * 1000L; +private RocksDbStore store; +private long retentionHours; +private volatile boolean shutdown = false; +private long sleepMs = DEFAULT_SLEEP_MS; +private Meter failureMeter; +private long purgeTimestamp = 0L; + +MetricsCleaner(RocksDbStore store, int retentionHours, int hourlyPeriod, Meter failureMeter) { +this.store = store; +this.retentionHours = retentionHours; +if (hourlyPeriod > 0) { +this.sleepMs = hourlyPeriod * 60L * 60L * 1000L; +} +this.failureMeter = failureMeter; + +Gauge gauge = new Gauge() { +@Override +public Long getValue() { +return purgeTimestamp; +} +}; + StormMetricsRegistry.registerProvidedGauge("MetricsCleaner:purgeTimestamp", gauge); +} + +@Override +public void close() { +shutdown = true; +} + +@Override +public void run() { +while (true) { +if (shutdown) { --- End diff -- Is this a reason to have separate if statement instead of `while (!shutdown)`? ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162544306 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java --- @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.http.annotation.NotThreadSafe; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class designed to perform all metrics inserts into RocksDB. Metrics are processed from a blocking queue. Inserts + * to RocksDB are done using a single thread to simplify design (such as looking up existing metric data for aggregation, + * and fetching/evicting metadata from the cache). + * + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids. As entries are added to the full cache, older + * entries are evicted from the cache and need to be written to the database. This happens as the handleEvictedMetadata() + * method callback. + * + * The following issues would need to be addressed to implement a multithreaded metrics writer: + * + * Generation of unique unused IDs for new metadata strings needs to be thread safe. + * Ensuring newly created metadata strings are seen by all threads. + * Maintaining a properly cached state of metadata for multiple writers. The current LRU cache + * evicts data as new metadata is added. + * Processing the aggregation of a metric requires fetching and updating previous aggregates. A multithreaded + * design would need to ensure two metrics were not updating an aggregated metric at the same time. + * Investigate performance of multiple threads inserting into RocksDB versus a single ordered insert. + * + */ +@NotThreadSafe +public class RocksDbMetricsWriter implements Runnable, AutoCloseable { +private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class); +private RocksDbStore store; +private BlockingQueue queue; +private WritableStringMetadataCache stringMetadataCache; +private Set unusedIds = new HashSet<>(); +private TreeMapinsertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order +private WriteOptions writeOpts = new WriteOptions(); +private volatile boolean shutdown = false; +private Meter failureMeter; +private ArrayList aggBuckets = new ArrayList<>(); + +/** + * Constructor for the RocksDbMetricsWriter. + * + * @param store The RocksDB store + * @param queue The queue to receive metrics for insertion + */ +RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter) { +this.store = store; +this.queue = queue; +this.failureMeter = failureMeter; + +aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN); +aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN); +aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN); +} + +/** + * Init routine called once the Metadata cache has been created. + * + * @throws MetricException on cache error + */ +void init() throws MetricException { +
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162506645 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java --- @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * FilterOptions provides a method to select various filtering options for doing a scan of the metrics database. + */ +public class FilterOptions { +private static final String componentId = "componentId"; +private static final String topologyId = "topologyId"; +private static final String startTime = "startTime"; +private static final String endTime = "endTime"; +private static final String metricName = "metricName"; +private static final String executorId = "executorId"; +private static final String hostId = "hostId"; +private static final String port = "port"; +private static final String streamId = "streamId"; +private Mapoptions = new HashMap<>(); --- End diff -- Any reason to store options into map with unnecessary explicit type conversion instead of defining elements as fields? ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162505874 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java --- @@ -693,4 +701,44 @@ public long getMemoryReservationMb() { public String getWorkerId() { return _workerId; } + +/** + * Send worker metrics to Nimbus. + */ +void processMetrics() { +try { +if (_usedMemory.get(_port) != null) { +// Make sure we don't process too frequently. +long nextMetricProcessTime = this.lastMetricProcessTime + 60L * 1000L; +long currentTimeMsec = System.currentTimeMillis(); +if (currentTimeMsec < nextMetricProcessTime) { +return; +} + +String hostname = Utils.hostname(); + +// create metric for memory +String metricName = "UsedMemory"; --- End diff -- minor: maybe better to have constants for strings here, since they're likely to be reused while retrieving information. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162545064 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java --- @@ -0,0 +1,639 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.FilterOptions; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.apache.storm.metricstore.MetricStore; +import org.apache.storm.utils.ObjectReader; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.IndexType; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RocksDbStore implements MetricStore, AutoCloseable { +private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class); +private static final int MAX_QUEUE_CAPACITY = 4000; +static final int INVALID_METADATA_STRING_ID = 0; +RocksDB db; +private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null; +private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY); +private RocksDbMetricsWriter metricsWriter = null; +private MetricsCleaner metricsCleaner = null; +private Meter failureMeter = null; + +interface RocksDbScanCallback { +boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan +} + +/** + * Create metric store instance using the configurations provided via the config map. + * + * @param config Storm config map + * @throws MetricException on preparation error + */ +public void prepare(Map config) throws MetricException { +validateConfig(config); + +this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures"); + +RocksDB.loadLibrary(); +boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false); + +try (Options options = new Options().setCreateIfMissing(createIfMissing)) { +// use the hash index for prefix searches +BlockBasedTableConfig tfc = new BlockBasedTableConfig(); +tfc.setIndexType(IndexType.kHashSearch); +options.setTableFormatConfig(tfc); +options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE); + +String path = getRocksDbAbsoluteDir(config); +LOG.info("Opening RocksDB from {}", path); +db = RocksDB.open(options, path); +} catch (RocksDBException e) { +String message = "Error opening RockDB database"; +LOG.error(message, e); +throw new MetricException(message, e); +} + +// create thread to delete old metrics and metadata +Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString()); +Integer deletionPeriod = 0; +if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) { +deletionPeriod =
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162322132 --- Diff: storm-client/src/storm.thrift --- @@ -836,3 +837,24 @@ exception HBAuthorizationException { exception HBExecutionException { 1: required string msg; } + +struct WorkerMetricFields { --- End diff -- Looks like WorkerMetricFields and WorkerMetricList, WorkerMetrics are used "before", forward reference. We encountered the issue because of forward reference of thrift definition. https://issues.apache.org/jira/browse/STORM-1842 Could you rearrange the order so that all the structs are defined earlier before using them? ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162319898 --- Diff: pom.xml --- @@ -324,6 +324,7 @@ 0.9.12 2.3.5 2.3.0 +5.8.6 --- End diff -- Just curious: is there some reason to pick 5.8.6? ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162323041 --- Diff: storm-client/src/storm.thrift --- @@ -836,3 +837,24 @@ exception HBAuthorizationException { exception HBExecutionException { 1: required string msg; } + +struct WorkerMetricFields { --- End diff -- minor: `WorkerMetricPoint` might be clearer than `WorkerMetricFields`. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162322576 --- Diff: storm-client/src/storm.thrift --- @@ -836,3 +837,24 @@ exception HBAuthorizationException { exception HBExecutionException { 1: required string msg; } + +struct WorkerMetricFields { --- End diff -- You could check importing ttypes.py to see it works fine. If current patch doesn't break ttypes.py please ignore my comment. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162349715 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java --- @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class designed to perform all metrics inserts into RocksDB. Metrics are processed from the a blocking queue. + * + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids. As entries are added to the full cache, older + * entries are evicted from the cache and need to be written to the database. This happens as the handleEvictedMetadata() + * method callback. + */ +public class RocksDbMetricsWriter implements Runnable, AutoCloseable { +private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class); +private RocksDbStore store; +private BlockingQueue queue; +private WritableStringMetadataCache stringMetadataCache; +private Set unusedIds = new HashSet<>(); +private TreeMapinsertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order --- End diff -- I tried to do this on line 43. I will expand that description and can add thoughts on future multithreading design. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r162319081 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java --- @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class designed to perform all metrics inserts into RocksDB. Metrics are processed from the a blocking queue. + * + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids. As entries are added to the full cache, older + * entries are evicted from the cache and need to be written to the database. This happens as the handleEvictedMetadata() + * method callback. + */ +public class RocksDbMetricsWriter implements Runnable, AutoCloseable { +private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class); +private RocksDbStore store; +private BlockingQueue queue; +private WritableStringMetadataCache stringMetadataCache; +private Set unusedIds = new HashSet<>(); +private TreeMapinsertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order --- End diff -- Maybe better to describe the intention, especially regarding thread-safety. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r160804460 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java --- @@ -936,6 +936,9 @@ static DynamicState handleRunning(DynamicState dynamicState, StaticState staticS } dynamicState = dynamicState.withProfileActions(mod, modPending); } + +dynamicState.container.processMetrics(); --- End diff -- Yes, will fix. ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r160797290 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java --- @@ -936,6 +936,9 @@ static DynamicState handleRunning(DynamicState dynamicState, StaticState staticS } dynamicState = dynamicState.withProfileActions(mod, modPending); } + +dynamicState.container.processMetrics(); --- End diff -- This is something I just noticed. `handleRunning` should be called about once a second. I don't think we want to send metrics that frequently. Most of the metrics are on a 30 second to 1 min frequency. I think standardizing on 1 min for us is going to make storing and querying them a lot simpler. Could we have a small amount of state that we can use to throttle this to once a min? ---
[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2504#discussion_r160776702 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1081,6 +1088,14 @@ public Nimbus(Mapconf, INimbus inimbus, IStormClusterState stor BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception { this.conf = conf; + +this.metricsStore = null; --- End diff -- Re-verified that a null MetricStore should be safe. ---