[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2504


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162768150
  
--- Diff: pom.xml ---
@@ -324,6 +324,7 @@
 0.9.12
 2.3.5
 2.3.0
+5.8.6
--- End diff --

Actually I don't have experience with RocksDB. If you see RocksDB 5.8.6 is 
running properly, let's just use the version.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162768100
  
--- Diff: storm-server/pom.xml ---
@@ -64,6 +64,10 @@
 auto-service
 true
 
+
+org.rocksdb
+rocksdbjni
--- End diff --

Good. We are good to go then.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162768064
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
 ---
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.metricstore.MetricStore;
+import org.apache.storm.utils.ObjectReader;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.IndexType;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDbStore implements MetricStore, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbStore.class);
+private static final int MAX_QUEUE_CAPACITY = 4000;
+static final int INVALID_METADATA_STRING_ID = 0;
+RocksDB db;
+private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
+private BlockingQueue queue = new 
LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
+private RocksDbMetricsWriter metricsWriter = null;
+private MetricsCleaner metricsCleaner = null;
+private Meter failureMeter = null;
+
+interface RocksDbScanCallback {
+boolean cb(RocksDbKey key, RocksDbValue val);  // return false to 
stop scan
+}
+
+/**
+ * Create metric store instance using the configurations provided via 
the config map.
+ *
+ * @param config Storm config map
+ * @throws MetricException on preparation error
+ */
+public void prepare(Map config) throws MetricException {
+validateConfig(config);
+
+this.failureMeter = 
StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
+
+RocksDB.loadLibrary();
+boolean createIfMissing = 
ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING),
 false);
+
+try (Options options = new 
Options().setCreateIfMissing(createIfMissing)) {
+// use the hash index for prefix searches
+BlockBasedTableConfig tfc = new BlockBasedTableConfig();
+tfc.setIndexType(IndexType.kHashSearch);
+options.setTableFormatConfig(tfc);
+options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
+
+String path = getRocksDbAbsoluteDir(config);
+LOG.info("Opening RocksDB from {}", path);
+db = RocksDB.open(options, path);
+} catch (RocksDBException e) {
+String message = "Error opening RockDB database";
+LOG.error(message, e);
+throw new MetricException(message, e);
+}
+
+// create thread to delete old metrics and metadata
+Integer retentionHours = 
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
+Integer deletionPeriod = 0;
+if 
(config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
+deletionPeriod = 

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-19 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162668430
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
 ---
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.metricstore.MetricStore;
+import org.apache.storm.utils.ObjectReader;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.IndexType;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDbStore implements MetricStore, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbStore.class);
+private static final int MAX_QUEUE_CAPACITY = 4000;
+static final int INVALID_METADATA_STRING_ID = 0;
+RocksDB db;
+private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
+private BlockingQueue queue = new 
LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
+private RocksDbMetricsWriter metricsWriter = null;
+private MetricsCleaner metricsCleaner = null;
+private Meter failureMeter = null;
+
+interface RocksDbScanCallback {
+boolean cb(RocksDbKey key, RocksDbValue val);  // return false to 
stop scan
+}
+
+/**
+ * Create metric store instance using the configurations provided via 
the config map.
+ *
+ * @param config Storm config map
+ * @throws MetricException on preparation error
+ */
+public void prepare(Map config) throws MetricException {
+validateConfig(config);
+
+this.failureMeter = 
StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
+
+RocksDB.loadLibrary();
+boolean createIfMissing = 
ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING),
 false);
+
+try (Options options = new 
Options().setCreateIfMissing(createIfMissing)) {
+// use the hash index for prefix searches
+BlockBasedTableConfig tfc = new BlockBasedTableConfig();
+tfc.setIndexType(IndexType.kHashSearch);
+options.setTableFormatConfig(tfc);
+options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
+
+String path = getRocksDbAbsoluteDir(config);
+LOG.info("Opening RocksDB from {}", path);
+db = RocksDB.open(options, path);
+} catch (RocksDBException e) {
+String message = "Error opening RockDB database";
+LOG.error(message, e);
+throw new MetricException(message, e);
+}
+
+// create thread to delete old metrics and metadata
+Integer retentionHours = 
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
+Integer deletionPeriod = 0;
+if 
(config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
+deletionPeriod = 

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-19 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162634637
  
--- Diff: storm-server/pom.xml ---
@@ -64,6 +64,10 @@
 auto-service
 true
 
+
+org.rocksdb
+rocksdbjni
--- End diff --

I tested on a mac and a RHEL vm.  The rocksDB jar also contains a win64 jni 
dll.  If an error is thrown creating the metrics store, everything should be 
treated as a noop.   


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-19 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162630427
  
--- Diff: pom.xml ---
@@ -324,6 +324,7 @@
 0.9.12
 2.3.5
 2.3.0
+5.8.6
--- End diff --

No.  It was the latest version at the time.  Open to advice/suggestions.  
5.8.7 and 5.9.2 now also exist.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162323436
  
--- Diff: storm-server/pom.xml ---
@@ -64,6 +64,10 @@
 auto-service
 true
 
+
+org.rocksdb
+rocksdbjni
--- End diff --

Just curious: I don't have experience with rocksdb and since it leverages 
JNI, I would like to be clear about how it works with multiple OS.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162320163
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java
 ---
@@ -35,6 +35,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
--- End diff --

nit: unnecessary two empty lines


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162543795
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
 ---
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.MetricException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for removing expired metrics and unused metadata from the RocksDB 
store.
+ */
+public class MetricsCleaner implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(MetricsCleaner.class);
+private static long DEFAULT_SLEEP_MS = 4L * 60L * 60L * 1000L;
+private RocksDbStore store;
+private long retentionHours;
+private volatile boolean shutdown = false;
+private long sleepMs = DEFAULT_SLEEP_MS;
+private Meter failureMeter;
+private long purgeTimestamp = 0L;
+
+MetricsCleaner(RocksDbStore store, int retentionHours, int 
hourlyPeriod, Meter failureMeter) {
+this.store = store;
+this.retentionHours = retentionHours;
+if (hourlyPeriod > 0) {
+this.sleepMs = hourlyPeriod * 60L * 60L * 1000L;
+}
+this.failureMeter = failureMeter;
+
+Gauge gauge = new Gauge() {
+@Override
+public Long getValue() {
+return purgeTimestamp;
+}
+};
+
StormMetricsRegistry.registerProvidedGauge("MetricsCleaner:purgeTimestamp", 
gauge);
+}
+
+@Override
+public void close() {
+shutdown = true;
+}
+
+@Override
+public void run() {
+while (true) {
+if (shutdown) {
--- End diff --

Is this a reason to have separate if statement instead of `while 
(!shutdown)`?


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162544306
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
 ---
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class designed to perform all metrics inserts into RocksDB.  Metrics 
are processed from a blocking queue.  Inserts
+ * to RocksDB are done using a single thread to simplify design (such as 
looking up existing metric data for aggregation,
+ * and fetching/evicting metadata from the cache).
+ * 
+ * A writable LRU StringMetadataCache is used to minimize looking up 
metadata string Ids.  As entries are added to the full cache, older
+ * entries are evicted from the cache and need to be written to the 
database.  This happens as the handleEvictedMetadata()
+ * method callback.
+ * 
+ * The following issues would need to be addressed to implement a 
multithreaded metrics writer:
+ * 
+ * Generation of unique unused IDs for new metadata strings needs 
to be thread safe.
+ * Ensuring newly created metadata strings are seen by all 
threads.
+ * Maintaining a properly cached state of metadata for multiple 
writers.  The current LRU cache
+ * evicts data as new metadata is added.
+ * Processing the aggregation of a metric requires fetching and 
updating previous aggregates.  A multithreaded
+ * design would need to ensure two metrics were not updating an 
aggregated metric at the same time.
+ * Investigate performance of multiple threads inserting into 
RocksDB versus a single ordered insert.
+ * 
+ */
+@NotThreadSafe
+public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbMetricsWriter.class);
+private RocksDbStore store;
+private BlockingQueue queue;
+private WritableStringMetadataCache stringMetadataCache;
+private Set unusedIds = new HashSet<>();
+private TreeMap insertBatch = new 
TreeMap<>(); // RocksDB should insert in sorted key order
+private WriteOptions writeOpts = new WriteOptions();
+private volatile boolean shutdown = false;
+private Meter failureMeter;
+private ArrayList aggBuckets = new ArrayList<>();
+
+/**
+ * Constructor for the RocksDbMetricsWriter.
+ *
+ * @param store   The RocksDB store
+ * @param queue   The queue to receive metrics for insertion
+ */
+RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter 
failureMeter)  {
+this.store = store;
+this.queue = queue;
+this.failureMeter = failureMeter;
+
+aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
+aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
+aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
+}
+
+/**
+ * Init routine called once the Metadata cache has been created.
+ *
+ * @throws MetricException  on cache error
+ */
+void init() throws MetricException {
+

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162506645
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java ---
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * FilterOptions provides a method to select various filtering options for 
doing a scan of the metrics database.
+ */
+public class FilterOptions {
+private static final String componentId = "componentId";
+private static final String topologyId = "topologyId";
+private static final String startTime = "startTime";
+private static final String endTime = "endTime";
+private static final String metricName = "metricName";
+private static final String executorId = "executorId";
+private static final String hostId = "hostId";
+private static final String port = "port";
+private static final String streamId = "streamId";
+private Map options = new HashMap<>();
--- End diff --

Any reason to store options into map with unnecessary explicit type 
conversion instead of defining elements as fields?


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162505874
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
@@ -693,4 +701,44 @@ public long getMemoryReservationMb() {
 public String getWorkerId() {
 return _workerId;
 }
+
+/**
+ * Send worker metrics to Nimbus.
+ */
+void processMetrics() {
+try {
+if (_usedMemory.get(_port) != null) {
+// Make sure we don't process too frequently.
+long nextMetricProcessTime = this.lastMetricProcessTime + 
60L * 1000L;
+long currentTimeMsec = System.currentTimeMillis();
+if (currentTimeMsec < nextMetricProcessTime) {
+return;
+}
+
+String hostname = Utils.hostname();
+
+// create metric for memory
+String metricName = "UsedMemory";
--- End diff --

minor: maybe better to have constants for strings here, since they're 
likely to be reused while retrieving information.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162545064
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
 ---
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.metricstore.MetricStore;
+import org.apache.storm.utils.ObjectReader;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.IndexType;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDbStore implements MetricStore, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbStore.class);
+private static final int MAX_QUEUE_CAPACITY = 4000;
+static final int INVALID_METADATA_STRING_ID = 0;
+RocksDB db;
+private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
+private BlockingQueue queue = new 
LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
+private RocksDbMetricsWriter metricsWriter = null;
+private MetricsCleaner metricsCleaner = null;
+private Meter failureMeter = null;
+
+interface RocksDbScanCallback {
+boolean cb(RocksDbKey key, RocksDbValue val);  // return false to 
stop scan
+}
+
+/**
+ * Create metric store instance using the configurations provided via 
the config map.
+ *
+ * @param config Storm config map
+ * @throws MetricException on preparation error
+ */
+public void prepare(Map config) throws MetricException {
+validateConfig(config);
+
+this.failureMeter = 
StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
+
+RocksDB.loadLibrary();
+boolean createIfMissing = 
ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING),
 false);
+
+try (Options options = new 
Options().setCreateIfMissing(createIfMissing)) {
+// use the hash index for prefix searches
+BlockBasedTableConfig tfc = new BlockBasedTableConfig();
+tfc.setIndexType(IndexType.kHashSearch);
+options.setTableFormatConfig(tfc);
+options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
+
+String path = getRocksDbAbsoluteDir(config);
+LOG.info("Opening RocksDB from {}", path);
+db = RocksDB.open(options, path);
+} catch (RocksDBException e) {
+String message = "Error opening RockDB database";
+LOG.error(message, e);
+throw new MetricException(message, e);
+}
+
+// create thread to delete old metrics and metadata
+Integer retentionHours = 
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
+Integer deletionPeriod = 0;
+if 
(config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
+deletionPeriod = 

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162322132
  
--- Diff: storm-client/src/storm.thrift ---
@@ -836,3 +837,24 @@ exception HBAuthorizationException {
 exception HBExecutionException {
   1: required string msg;
 }
+
+struct WorkerMetricFields {
--- End diff --

Looks like WorkerMetricFields and WorkerMetricList, WorkerMetrics are used 
"before", forward reference.

We encountered the issue because of forward reference of thrift definition. 
https://issues.apache.org/jira/browse/STORM-1842

Could you rearrange the order so that all the structs are defined earlier 
before using them?


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162319898
  
--- Diff: pom.xml ---
@@ -324,6 +324,7 @@
 0.9.12
 2.3.5
 2.3.0
+5.8.6
--- End diff --

Just curious: is there some reason to pick 5.8.6?


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162323041
  
--- Diff: storm-client/src/storm.thrift ---
@@ -836,3 +837,24 @@ exception HBAuthorizationException {
 exception HBExecutionException {
   1: required string msg;
 }
+
+struct WorkerMetricFields {
--- End diff --

minor: `WorkerMetricPoint` might be clearer than `WorkerMetricFields`.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162322576
  
--- Diff: storm-client/src/storm.thrift ---
@@ -836,3 +837,24 @@ exception HBAuthorizationException {
 exception HBExecutionException {
   1: required string msg;
 }
+
+struct WorkerMetricFields {
--- End diff --

You could check importing ttypes.py to see it works fine. If current patch 
doesn't break ttypes.py please ignore my comment.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162349715
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
 ---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class designed to perform all metrics inserts into RocksDB.  Metrics 
are processed from the a blocking queue.
+ * 
+ * A writable LRU StringMetadataCache is used to minimize looking up 
metadata string Ids.  As entries are added to the full cache, older
+ * entries are evicted from the cache and need to be written to the 
database.  This happens as the handleEvictedMetadata()
+ * method callback.
+ */
+public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbMetricsWriter.class);
+private RocksDbStore store;
+private BlockingQueue queue;
+private WritableStringMetadataCache stringMetadataCache;
+private Set unusedIds = new HashSet<>();
+private TreeMap insertBatch = new 
TreeMap<>(); // RocksDB should insert in sorted key order
--- End diff --

I tried to do this on line 43.  I will expand that description and can add 
thoughts on future multithreading design.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r162319081
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
 ---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class designed to perform all metrics inserts into RocksDB.  Metrics 
are processed from the a blocking queue.
+ * 
+ * A writable LRU StringMetadataCache is used to minimize looking up 
metadata string Ids.  As entries are added to the full cache, older
+ * entries are evicted from the cache and need to be written to the 
database.  This happens as the handleEvictedMetadata()
+ * method callback.
+ */
+public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbMetricsWriter.class);
+private RocksDbStore store;
+private BlockingQueue queue;
+private WritableStringMetadataCache stringMetadataCache;
+private Set unusedIds = new HashSet<>();
+private TreeMap insertBatch = new 
TreeMap<>(); // RocksDB should insert in sorted key order
--- End diff --

Maybe better to describe the intention, especially regarding thread-safety.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-10 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r160804460
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
@@ -936,6 +936,9 @@ static DynamicState handleRunning(DynamicState 
dynamicState, StaticState staticS
 }
 dynamicState = dynamicState.withProfileActions(mod, 
modPending);
 }
+
+dynamicState.container.processMetrics();
--- End diff --

Yes, will fix.


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-10 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r160797290
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
@@ -936,6 +936,9 @@ static DynamicState handleRunning(DynamicState 
dynamicState, StaticState staticS
 }
 dynamicState = dynamicState.withProfileActions(mod, 
modPending);
 }
+
+dynamicState.container.processMetrics();
--- End diff --

This is something I just noticed.  `handleRunning` should be called about 
once a second.  I don't think we want to send metrics that frequently.  Most of 
the metrics are on a 30 second to 1 min frequency.  I think standardizing on 1 
min for us is going to make storing and querying them a lot simpler.  Could we 
have a small amount of state that we can use to throttle this to once a min?


---


[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

2018-01-10 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2504#discussion_r160776702
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1081,6 +1088,14 @@ public Nimbus(Map conf, INimbus 
inimbus, IStormClusterState stor
 BlobStore blobStore, TopoCache topoCache, ILeaderElector 
leaderElector, IGroupMappingServiceProvider groupMapper)
 throws Exception {
 this.conf = conf;
+
+this.metricsStore = null;
--- End diff --

Re-verified that a null MetricStore should be safe.


---