http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
new file mode 100644
index 0000000..73dc0be
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+
+/**
+ * The {@code PredefinedOptions} are configuration settings for the {@link 
RocksDBStateBackend}.
+ * The various pre-defined choices are configurations that have been 
empirically
+ * determined to be beneficial for performance under different settings.
+ *
+ * <p>Some of these settings are based on experiments by the Flink community, 
some follow
+ * guides from the RocksDB project.
+ */
+public enum PredefinedOptions {
+
+       /**
+        * Default options for all settings, except that writes are not forced 
to the
+        * disk.
+        *
+        * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
+        * there is no need to sync data to stable storage.
+        */
+       DEFAULT {
+
+               @Override
+               public DBOptions createDBOptions() {
+                       return new DBOptions()
+                                       .setUseFsync(false);
+               }
+
+               @Override
+               public ColumnFamilyOptions createColumnOptions() {
+                       return new ColumnFamilyOptions();
+               }
+
+       },
+
+       /**
+        * Pre-defined options for regular spinning hard disks.
+        *
+        * <p>This constant configures RocksDB with some options that lead 
empirically
+        * to better performance when the machines executing the system use
+        * regular spinning hard disks.
+        *
+        * <p>The following options are set:
+        * <ul>
+        *     <li>setCompactionStyle(CompactionStyle.LEVEL)</li>
+        *     <li>setLevelCompactionDynamicLevelBytes(true)</li>
+        *     <li>setIncreaseParallelism(4)</li>
+        *     <li>setUseFsync(false)</li>
+        *     <li>setDisableDataSync(true)</li>
+        *     <li>setMaxOpenFiles(-1)</li>
+        * </ul>
+        *
+        * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
+        * there is no need to sync data to stable storage.
+        */
+       SPINNING_DISK_OPTIMIZED {
+
+               @Override
+               public DBOptions createDBOptions() {
+
+                       return new DBOptions()
+                                       .setIncreaseParallelism(4)
+                                       .setUseFsync(false)
+                                       .setMaxOpenFiles(-1);
+               }
+
+               @Override
+               public ColumnFamilyOptions createColumnOptions() {
+                       return new ColumnFamilyOptions()
+                                       
.setCompactionStyle(CompactionStyle.LEVEL)
+                                       
.setLevelCompactionDynamicLevelBytes(true);
+               }
+       },
+
+       /**
+        * Pre-defined options for better performance on regular spinning hard 
disks,
+        * at the cost of a higher memory consumption.
+        *
+        * <p><b>NOTE: These settings will cause RocksDB to consume a lot of 
memory for
+        * block caching and compactions. If you experience out-of-memory 
problems related to,
+        * RocksDB, consider switching back to {@link 
#SPINNING_DISK_OPTIMIZED}.</b></p>
+        *
+        * <p>The following options are set:
+        * <ul>
+        *     <li>setLevelCompactionDynamicLevelBytes(true)</li>
+        *     <li>setTargetFileSizeBase(256 MBytes)</li>
+        *     <li>setMaxBytesForLevelBase(1 GByte)</li>
+        *     <li>setWriteBufferSize(64 MBytes)</li>
+        *     <li>setIncreaseParallelism(4)</li>
+        *     <li>setMinWriteBufferNumberToMerge(3)</li>
+        *     <li>setMaxWriteBufferNumber(4)</li>
+        *     <li>setUseFsync(false)</li>
+        *     <li>setMaxOpenFiles(-1)</li>
+        *     <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li>
+        *     <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li>
+        * </ul>
+        *
+        * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
+        * there is no need to sync data to stable storage.
+        */
+       SPINNING_DISK_OPTIMIZED_HIGH_MEM {
+
+               @Override
+               public DBOptions createDBOptions() {
+
+                       return new DBOptions()
+                                       .setIncreaseParallelism(4)
+                                       .setUseFsync(false)
+                                       .setMaxOpenFiles(-1);
+               }
+
+               @Override
+               public ColumnFamilyOptions createColumnOptions() {
+
+                       final long blockCacheSize = 256 * 1024 * 1024;
+                       final long blockSize = 128 * 1024;
+                       final long targetFileSize = 256 * 1024 * 1024;
+                       final long writeBufferSize = 64 * 1024 * 1024;
+
+                       return new ColumnFamilyOptions()
+                                       
.setCompactionStyle(CompactionStyle.LEVEL)
+                                       
.setLevelCompactionDynamicLevelBytes(true)
+                                       .setTargetFileSizeBase(targetFileSize)
+                                       .setMaxBytesForLevelBase(4 * 
targetFileSize)
+                                       .setWriteBufferSize(writeBufferSize)
+                                       .setMinWriteBufferNumberToMerge(3)
+                                       .setMaxWriteBufferNumber(4)
+                                       .setTableFormatConfig(
+                                                       new 
BlockBasedTableConfig()
+                                                                       
.setBlockCacheSize(blockCacheSize)
+                                                                       
.setBlockSize(blockSize)
+                                                                       
.setFilter(new BloomFilter())
+                                       );
+               }
+       },
+
+       /**
+        * Pre-defined options for Flash SSDs.
+        *
+        * <p>This constant configures RocksDB with some options that lead 
empirically
+        * to better performance when the machines executing the system use 
SSDs.
+        *
+        * <p>The following options are set:
+        * <ul>
+        *     <li>setIncreaseParallelism(4)</li>
+        *     <li>setUseFsync(false)</li>
+        *     <li>setDisableDataSync(true)</li>
+        *     <li>setMaxOpenFiles(-1)</li>
+        * </ul>
+        *
+        * <p>Note: Because Flink does not rely on RocksDB data on disk for 
recovery,
+        * there is no need to sync data to stable storage.
+        */
+       FLASH_SSD_OPTIMIZED {
+
+               @Override
+               public DBOptions createDBOptions() {
+                       return new DBOptions()
+                                       .setIncreaseParallelism(4)
+                                       .setUseFsync(false)
+                                       .setMaxOpenFiles(-1);
+               }
+
+               @Override
+               public ColumnFamilyOptions createColumnOptions() {
+                       return new ColumnFamilyOptions();
+               }
+       };
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates the {@link DBOptions}for this pre-defined setting.
+        *
+        * @return The pre-defined options object.
+        */
+       public abstract DBOptions createDBOptions();
+
+       /**
+        * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this 
pre-defined setting.
+        *
+        * @return The pre-defined options object.
+        */
+       public abstract ColumnFamilyOptions createColumnOptions();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
new file mode 100644
index 0000000..2c07814
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * An {@link AggregatingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key
+ * @param <N> The type of the namespace
+ * @param <T> The type of the values that aggregated into the state
+ * @param <ACC> The type of the value stored in the state (the accumulator 
type)
+ * @param <R> The type of the value returned from the state
+ */
+public class RocksDBAggregatingState<K, N, T, ACC, R>
+       extends AbstractRocksDBState<K, N, AggregatingState<T, R>, 
AggregatingStateDescriptor<T, ACC, R>, ACC>
+       implements InternalAggregatingState<N, T, R> {
+
+       /** Serializer for the values. */
+       private final TypeSerializer<ACC> valueSerializer;
+
+       /** User-specified aggregation function. */
+       private final AggregateFunction<T, ACC, R> aggFunction;
+
+       /**
+        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
+        * because JNI segfaults for some reason if they are.
+        */
+       private final WriteOptions writeOptions;
+
+       /**
+        * Creates a new {@code RocksDBFoldingState}.
+        *
+        * @param namespaceSerializer
+        *             The serializer for the namespace.
+        * @param stateDesc
+        *             The state identifier for the state. This contains the 
state name and aggregation function.
+        */
+       public RocksDBAggregatingState(
+                       ColumnFamilyHandle columnFamily,
+                       TypeSerializer<N> namespaceSerializer,
+                       AggregatingStateDescriptor<T, ACC, R> stateDesc,
+                       RocksDBKeyedStateBackend<K> backend) {
+
+               super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+               this.valueSerializer = stateDesc.getSerializer();
+               this.aggFunction = stateDesc.getAggregateFunction();
+
+               writeOptions = new WriteOptions();
+               writeOptions.setDisableWAL(true);
+       }
+
+       @Override
+       public R get() throws IOException {
+               try {
+                       // prepare the current key and namespace for RocksDB 
lookup
+                       writeCurrentKeyWithGroupAndNamespace();
+                       final byte[] key = keySerializationStream.toByteArray();
+
+                       // get the current value
+                       final byte[] valueBytes = backend.db.get(columnFamily, 
key);
+
+                       if (valueBytes == null) {
+                               return null;
+                       }
+
+                       ACC accumulator = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+                       return aggFunction.getResult(accumulator);
+               }
+               catch (IOException | RocksDBException e) {
+                       throw new IOException("Error while retrieving value 
from RocksDB", e);
+               }
+       }
+
+       @Override
+       public void add(T value) throws IOException {
+               try {
+                       // prepare the current key and namespace for RocksDB 
lookup
+                       writeCurrentKeyWithGroupAndNamespace();
+                       final byte[] key = keySerializationStream.toByteArray();
+                       keySerializationStream.reset();
+
+                       // get the current value
+                       final byte[] valueBytes = backend.db.get(columnFamily, 
key);
+
+                       // deserialize the current accumulator, or create a 
blank one
+                       ACC accumulator = valueBytes == null ?
+                                       aggFunction.createAccumulator() :
+                                       valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+
+                       // aggregate the value into the accumulator
+                       accumulator = aggFunction.add(value, accumulator);
+
+                       // serialize the new accumulator
+                       final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+                       valueSerializer.serialize(accumulator, out);
+
+                       // write the new value to RocksDB
+                       backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+               }
+               catch (IOException | RocksDBException e) {
+                       throw new IOException("Error while adding value to 
RocksDB", e);
+               }
+       }
+
+       @Override
+       public void mergeNamespaces(N target, Collection<N> sources) throws 
Exception {
+               if (sources == null || sources.isEmpty()) {
+                       return;
+               }
+
+               // cache key and namespace
+               final K key = backend.getCurrentKey();
+               final int keyGroup = backend.getCurrentKeyGroupIndex();
+
+               try {
+                       ACC current = null;
+
+                       // merge the sources to the target
+                       for (N source : sources) {
+                               if (source != null) {
+                                       writeKeyWithGroupAndNamespace(
+                                                       keyGroup, key, source,
+                                                       keySerializationStream, 
keySerializationDataOutputView);
+
+                                       final byte[] sourceKey = 
keySerializationStream.toByteArray();
+                                       final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
+                                       backend.db.delete(columnFamily, 
sourceKey);
+
+                                       if (valueBytes != null) {
+                                               ACC value = 
valueSerializer.deserialize(
+                                                               new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+
+                                               if (current != null) {
+                                                       current = 
aggFunction.merge(current, value);
+                                               }
+                                               else {
+                                                       current = value;
+                                               }
+                                       }
+                               }
+                       }
+
+                       // if something came out of merging the sources, merge 
it or write it to the target
+                       if (current != null) {
+                               // create the target full-binary-key
+                               writeKeyWithGroupAndNamespace(
+                                               keyGroup, key, target,
+                                               keySerializationStream, 
keySerializationDataOutputView);
+
+                               final byte[] targetKey = 
keySerializationStream.toByteArray();
+                               final byte[] targetValueBytes = 
backend.db.get(columnFamily, targetKey);
+
+                               if (targetValueBytes != null) {
+                                       // target also had a value, merge
+                                       ACC value = valueSerializer.deserialize(
+                                                       new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+
+                                       current = aggFunction.merge(current, 
value);
+                               }
+
+                               // serialize the resulting value
+                               keySerializationStream.reset();
+                               valueSerializer.serialize(current, 
keySerializationDataOutputView);
+
+                               // write the resulting value
+                               backend.db.put(columnFamily, writeOptions, 
targetKey, keySerializationStream.toByteArray());
+                       }
+               }
+               catch (Exception e) {
+                       throw new Exception("Error while merging state in 
RocksDB", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
new file mode 100644
index 0000000..479565e
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalFoldingState;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+
+/**
+ * {@link FoldingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that can be folded into the state.
+ * @param <ACC> The type of the value in the folding state.
+ *
+ * @deprecated will be removed in a future version
+ */
+@Deprecated
+public class RocksDBFoldingState<K, N, T, ACC>
+       extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, ACC>
+       implements InternalFoldingState<N, T, ACC> {
+
+       /** Serializer for the values. */
+       private final TypeSerializer<ACC> valueSerializer;
+
+       /** User-specified fold function. */
+       private final FoldFunction<T, ACC> foldFunction;
+
+       /**
+        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
+        * because JNI segfaults for some reason if they are.
+        */
+       private final WriteOptions writeOptions;
+
+       /**
+        * Creates a new {@code RocksDBFoldingState}.
+        *
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                     and can create a default state value.
+        */
+       public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
+                       TypeSerializer<N> namespaceSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDesc,
+                       RocksDBKeyedStateBackend<K> backend) {
+
+               super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+               this.valueSerializer = stateDesc.getSerializer();
+               this.foldFunction = stateDesc.getFoldFunction();
+
+               writeOptions = new WriteOptions();
+               writeOptions.setDisableWAL(true);
+       }
+
+       @Override
+       public ACC get() {
+               try {
+                       writeCurrentKeyWithGroupAndNamespace();
+                       byte[] key = keySerializationStream.toByteArray();
+                       byte[] valueBytes = backend.db.get(columnFamily, key);
+                       if (valueBytes == null) {
+                               return null;
+                       }
+                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+               } catch (IOException | RocksDBException e) {
+                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
+               }
+       }
+
+       @Override
+       public void add(T value) throws IOException {
+               try {
+                       writeCurrentKeyWithGroupAndNamespace();
+                       byte[] key = keySerializationStream.toByteArray();
+                       byte[] valueBytes = backend.db.get(columnFamily, key);
+                       DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+                       if (valueBytes == null) {
+                               keySerializationStream.reset();
+                               
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), 
value), out);
+                               backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+                       } else {
+                               ACC oldValue = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+                               ACC newValue = foldFunction.fold(oldValue, 
value);
+                               keySerializationStream.reset();
+                               valueSerializer.serialize(newValue, out);
+                               backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
+               }
+       }
+
+}

Reply via email to