[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141120298
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -   [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
--- End diff --

Because otherwise, when reading from the index, it's not that easy to parse 
the object's key from the leveldb key so that you can retrieve the object 
itself.

You also have to store something, leveldb doesn't allow you to store null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141119638
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -   [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -   [count of all Foo with "bar=yes" ]
+ * 
+ *
+ * 
+ * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an 
index by telling LevelDB
+ * to seek to the "phantom" end marker of the index. Throughout the code 
and comments, this part
+ * of the full LevelDB key is generally referred to as the "index value" 
of the entity.
+ * 
+ *
+ * 
+ * Child indices are stored after their parent index. In the example 
above, let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have 
value "no" for this field,
+ * the data in the store would look something like the following:
+ * 
+ *
+ * 
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on 
index type]
+ * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on 

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141119266
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * 
+ * There are two main features provided by the implementations of this 
interface:
+ * 
+ *
+ * Serialization
+ *
+ * 
+ * If the underlying data store requires serialization, data will be 
serialized to and deserialized
+ * using a {@link KVStoreSerializer}, which can be customized by the 
application. The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for 
controlling the serialization of
+ * app-defined types.
+ * 
+ *
+ * 
+ * Data is also automatically compressed to save disk space.
+ * 
+ *
+ * Automatic Key Management
--- End diff --

First, you do realize this PR has been merged a long time ago, right?

Automatic means you don't have to manually create keys. You're writing 
objects to the store, not calling something like `.put(key, object)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141084103
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -   [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -   [count of all Foo with "bar=yes" ]
+ * 
+ *
+ * 
+ * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an 
index by telling LevelDB
+ * to seek to the "phantom" end marker of the index. Throughout the code 
and comments, this part
+ * of the full LevelDB key is generally referred to as the "index value" 
of the entity.
+ * 
+ *
+ * 
+ * Child indices are stored after their parent index. In the example 
above, let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have 
value "no" for this field,
+ * the data in the store would look something like the following:
+ * 
+ *
+ * 
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on 
index type]
+ * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on 

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141082210
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -   [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
--- End diff --

since `key1` is already in key, why we still need to put `key` in the value 
region?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141080917
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
--- End diff --

is this the so-called pointer? For secondary index the value is natural 
key, which can be used to get the actual value/app-data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r141079536
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * 
+ * There are two main features provided by the implementations of this 
interface:
+ * 
+ *
+ * Serialization
+ *
+ * 
+ * If the underlying data store requires serialization, data will be 
serialized to and deserialized
+ * using a {@link KVStoreSerializer}, which can be customized by the 
application. The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for 
controlling the serialization of
+ * app-defined types.
+ * 
+ *
+ * 
+ * Data is also automatically compressed to save disk space.
+ * 
+ *
+ * Automatic Key Management
--- End diff --

it will be better to have some examples, as I get confused here. What does 
`Automatic` mean? I thought users have to provide a natural index.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r140827076
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Serializer used to translate between app-defined types and the LevelDB 
store.
+ *
+ * 
+ * The serializer is based on Jackson, so values are written as JSON. It 
also allows "naked strings"
+ * and integers to be written as values directly, which will be written as 
UTF-8 strings.
+ * 
+ */
+public class KVStoreSerializer {
+
+  /**
+   * Object mapper used to process app-specific types. If an application 
requires a specific
+   * configuration of the mapper, it can subclass this serializer and add 
custom configuration
+   * to this object.
+   */
+  protected final ObjectMapper mapper;
+
+  public KVStoreSerializer() {
+this.mapper = new ObjectMapper();
+  }
+
+  public final byte[] serialize(Object o) throws Exception {
+if (o instanceof String) {
--- End diff --

Why does that matter? Objects are written as JSON, it's not like the goal 
there is efficiency.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r140776929
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Serializer used to translate between app-defined types and the LevelDB 
store.
+ *
+ * 
+ * The serializer is based on Jackson, so values are written as JSON. It 
also allows "naked strings"
+ * and integers to be written as values directly, which will be written as 
UTF-8 strings.
+ * 
+ */
+public class KVStoreSerializer {
+
+  /**
+   * Object mapper used to process app-specific types. If an application 
requires a specific
+   * configuration of the mapper, it can subclass this serializer and add 
custom configuration
+   * to this object.
+   */
+  protected final ObjectMapper mapper;
+
+  public KVStoreSerializer() {
+this.mapper = new ObjectMapper();
+  }
+
+  public final byte[] serialize(Object o) throws Exception {
+if (o instanceof String) {
--- End diff --

yea, but then we don't have the efficient UTF8 encoding, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-21 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r140290617
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Serializer used to translate between app-defined types and the LevelDB 
store.
+ *
+ * 
+ * The serializer is based on Jackson, so values are written as JSON. It 
also allows "naked strings"
+ * and integers to be written as values directly, which will be written as 
UTF-8 strings.
+ * 
+ */
+public class KVStoreSerializer {
+
+  /**
+   * Object mapper used to process app-specific types. If an application 
requires a specific
+   * configuration of the mapper, it can subclass this serializer and add 
custom configuration
+   * to this object.
+   */
+  protected final ObjectMapper mapper;
+
+  public KVStoreSerializer() {
+this.mapper = new ObjectMapper();
+  }
+
+  public final byte[] serialize(Object o) throws Exception {
+if (o instanceof String) {
--- End diff --

Classes are handled by Jackson (the "else").


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-09-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r140167530
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreSerializer.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Serializer used to translate between app-defined types and the LevelDB 
store.
+ *
+ * 
+ * The serializer is based on Jackson, so values are written as JSON. It 
also allows "naked strings"
+ * and integers to be written as values directly, which will be written as 
UTF-8 strings.
+ * 
+ */
+public class KVStoreSerializer {
+
+  /**
+   * Object mapper used to process app-specific types. If an application 
requires a specific
+   * configuration of the mapper, it can subclass this serializer and add 
custom configuration
+   * to this object.
+   */
+  protected final ObjectMapper mapper;
+
+  public KVStoreSerializer() {
+this.mapper = new ObjectMapper();
+  }
+
+  public final byte[] serialize(Object o) throws Exception {
+if (o instanceof String) {
--- End diff --

this only handles top-level string right? How about string field in a class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17902


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119904509
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List allEntries;
+  private static List clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator {
+/**
+ * Returns a comparator that falls back to natural order if this 
comparator's ordering
+ * returns equality for two elements. Used to mimic how the index 
sorts things internally.
+ */
+default BaseComparator fallback() {
+  return (t1, t2) -> {
+int result = BaseComparator.this.compare(t1, t2);
+if (result != 0) {
+  return result;
+}
+
+return t1.key.compareTo(t2.key);
+  };
+}
+
+/** Reverses the order of this comparator. */
+default BaseComparator reverse() {
+  return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+}
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> 
t1.num - t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, 
before all tests are
+   * run. Any state can be safely stored in static variables and cleaned 
up in a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+allEntries = null;
+db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+if (db != null) {
--- End diff --

for later debugging, it would be helpful to log the random generator seed 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119713952
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
--- End diff --

Yeah, it's a little confusing because I try to only call "key" the full key 
in LevelDB. Let me look at the comments and try to make them consistent, at 
least.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119652958
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java ---
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List allEntries;
+  private static List clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator {
+/**
+ * Returns a comparator that falls back to natural order if this 
comparator's ordering
+ * returns equality for two elements. Used to mimic how the index 
sorts things internally.
+ */
+default BaseComparator fallback() {
+  return (t1, t2) -> {
+int result = BaseComparator.this.compare(t1, t2);
+if (result != 0) {
+  return result;
+}
+
+return t1.key.compareTo(t2.key);
+  };
+}
+
+/** Reverses the order of this comparator. */
+default BaseComparator reverse() {
+  return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+}
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> 
t1.num - t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, 
before all tests are
+   * run. Any state can be safely stored in static variables and cleaned 
up in a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+allEntries = null;
+db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+if (db != null) {
+  return;
+}
+
+db = createStore();
+
+int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
+
+// Instead of generating sequential IDs, generate random unique IDs to 
avoid the insertion
+// order matching the natural ordering. Just in case.
+boolean[] usedIDs = new boolean[count];
+
+allEntries = new ArrayList<>(count);
+for (int i = 0; i < count; i++) {
+  CustomType1 t = new CustomType1();
+
+  int id;
+  do {
+id = RND.nextInt(count);
+  } while (usedIDs[id]);
--- End diff --

I know this doesn't really matter, but this is O(n^2), listing the ids and 
then using 
https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#shuffle-java.util.List-java.util.Random-
 would be O(n)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is 

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119657514
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java 
---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBTypeInfoSuite {
+
+  @Test
+  public void testIndexAnnotation() throws Exception {
+KVTypeInfo ti = new KVTypeInfo(CustomType1.class);
+assertEquals(5, ti.indices().count());
+
+CustomType1 t1 = new CustomType1();
+t1.key = "key";
+t1.id = "id";
+t1.name = "name";
+t1.num = 42;
+t1.child = "child";
+
+assertEquals(t1.key, ti.getIndexValue(KVIndex.NATURAL_INDEX_NAME, t1));
+assertEquals(t1.id, ti.getIndexValue("id", t1));
+assertEquals(t1.name, ti.getIndexValue("name", t1));
+assertEquals(t1.num, ti.getIndexValue("int", t1));
+assertEquals(t1.child, ti.getIndexValue("child", t1));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoNaturalIndex() throws Exception {
+newTypeInfo(NoNaturalIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDuplicateIndex() throws Exception {
+newTypeInfo(DuplicateIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyIndexName() throws Exception {
+newTypeInfo(EmptyIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexName() throws Exception {
+newTypeInfo(IllegalIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexMethod() throws Exception {
+newTypeInfo(IllegalIndexMethod.class);
+  }
+
+  @Test
+  public void testKeyClashes() throws Exception {
+LevelDBTypeInfo ti = newTypeInfo(CustomType1.class);
+
+CustomType1 t1 = new CustomType1();
+t1.key = "key1";
+t1.name = "a";
+
+CustomType1 t2 = new CustomType1();
+t2.key = "key2";
+t2.name = "aa";
+
+CustomType1 t3 = new CustomType1();
+t3.key = "key3";
+t3.name = "aaa";
+
+// Make sure entries with conflicting names are sorted correctly.
+assertBefore(ti.index("name").entityKey(null, t1), 
ti.index("name").entityKey(null, t2));
+assertBefore(ti.index("name").entityKey(null, t1), 
ti.index("name").entityKey(null, t3));
+assertBefore(ti.index("name").entityKey(null, t2), 
ti.index("name").entityKey(null, t3));
+  }
+
+  @Test
+  public void testNumEncoding() throws Exception {
+LevelDBTypeInfo.Index idx = 
newTypeInfo(CustomType1.class).indices().iterator().next();
+
+assertEquals("+=0001", new String(idx.toKey(1), UTF_8));
+assertEquals("+=0010", new String(idx.toKey(16), UTF_8));
+assertEquals("+=7fff", new String(idx.toKey(Integer.MAX_VALUE), 
UTF_8));
+
+assertBefore(idx.toKey(1), idx.toKey(2));
+assertBefore(idx.toKey(-1), idx.toKey(2));
+assertBefore(idx.toKey(-11), idx.toKey(2));
+assertBefore(idx.toKey(-11), idx.toKey(-1));
+assertBefore(idx.toKey(1), idx.toKey(11));
+assertBefore(idx.toKey(Integer.MIN_VALUE), 
idx.toKey(Integer.MAX_VALUE));
+
+assertBefore(idx.toKey(1L), idx.toKey(2L));
+assertBefore(idx.toKey(-1L), idx.toKey(2L));
+assertBefore(idx.toKey(Long.MIN_VALUE), idx.toKey(Long.MAX_VALUE));
+
+assertBefore(idx.toKey((short) 1), idx.toKey((short) 2));
+assertBefore(idx.toKey((short) -1), idx.toKey((short) 2));
+assertBefore(idx.toKey(Short.MIN_VALUE), idx.toKey(Short.MAX_VALUE));
+
+assertBefore(idx.toKey((byte) 

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119651685
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -   [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -   [count of all Foo with "bar=yes" ]
+ * 
+ *
+ * 
+ * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an 
index by telling LevelDB
+ * to seek to the "phantom" end marker of the index.
+ * 
+ *
+ * 
+ * Child indices are stored after their parent index. In the example 
above, let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have 
value "no" for this field,
+ * the data in the store would look something like the following:
+ * 
+ *
+ * 
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on 
index type]
+ * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on 
index type]
+ * ...
+ * 
+ */
+class LevelDBTypeInfo {
+
+  static final byte[] END_MARKER = new byte[] { '-' };
+  

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119649224
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
--- End diff --

one thing I had some trouble keeping straight as I read through this was 
the difference between an index "key" and an index "value".  Normally i think 
of "value" as what you are calling "data" here.  It seems like you are calling 
the index value just final component "+key1", while the key refers to the 
entire thing "Foo __main__ +key1".  

Is that right?

It might also help adding comments on `entityKey()` and `getValue()` as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119461586
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data 
store.
+ */
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = 
"__types__".getBytes(UTF_8);
+
+  final AtomicReference _db;
+  final KVStoreSerializer serializer;
+
+  private final ConcurrentMap typeAliases;
+  private final ConcurrentMap types;
+
+  public LevelDB(File path) throws Exception {
+this(path, new KVStoreSerializer());
+  }
+
+  public LevelDB(File path, KVStoreSerializer serializer) throws Exception 
{
+this.serializer = serializer;
+this.types = new ConcurrentHashMap<>();
+
+Options options = new Options();
+options.createIfMissing(!path.exists());
+this._db = new AtomicReference<>(JniDBFactory.factory.open(path, 
options));
+
+byte[] versionData = db().get(STORE_VERSION_KEY);
+if (versionData != null) {
+  long version = serializer.deserializeLong(versionData);
+  if (version != STORE_VERSION) {
+throw new UnsupportedStoreVersionException();
+  }
+} else {
+  db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+}
+
+Map aliases;
+try {
+  aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+} catch (NoSuchElementException e) {
+  aliases = new HashMap<>();
+}
+typeAliases = new ConcurrentHashMap<>(aliases);
+  }
+
+  @Override
+  public  T getMetadata(Class klass) throws Exception {
+try {
+  return get(METADATA_KEY, klass);
+} catch (NoSuchElementException nsee) {
+  return null;
+}
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+if (value != null) {
+  put(METADATA_KEY, value);
+} else {
+  db().delete(METADATA_KEY);
+}
+  }
+
+   T get(byte[] key, Class klass) throws Exception {
+byte[] data = db().get(key);
+if (data == null) {
+  throw new NoSuchElementException(new String(key, UTF_8));
+}
+return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+Preconditions.checkArgument(value != null, "Null values are not 
allowed.");
+db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public  T read(Class klass, Object naturalKey) throws 

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119459850
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wrapper around types managed in a KVStore, providing easy access to 
their indexed fields.
+ */
+public class KVTypeInfo {
+
+  private final Class type;
+  private final Map indices;
+  private final Map accessors;
+
+  public KVTypeInfo(Class type) throws Exception {
+this.type = type;
+this.accessors = new HashMap<>();
+this.indices = new HashMap<>();
+
+for (Field f : type.getFields()) {
+  KVIndex idx = f.getAnnotation(KVIndex.class);
+  if (idx != null) {
+checkIndex(idx, indices);
+indices.put(idx.value(), idx);
+accessors.put(idx.value(), new FieldAccessor(f));
+  }
+}
+
+for (Method m : type.getMethods()) {
+  KVIndex idx = m.getAnnotation(KVIndex.class);
+  if (idx != null) {
+checkIndex(idx, indices);
+Preconditions.checkArgument(m.getParameterTypes().length == 0,
+  "Annotated method %s::%s should not have any parameters.", 
type.getName(), m.getName());
+indices.put(idx.value(), idx);
+accessors.put(idx.value(), new MethodAccessor(m));
+  }
+}
+
+
Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
+"No natural index defined for type %s.", type.getName());
+
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
+"Natural index of %s cannot have a parent.", type.getName());
+
+for (KVIndex idx : indices.values()) {
+  if (!idx.parent().isEmpty()) {
+KVIndex parent = indices.get(idx.parent());
+Preconditions.checkArgument(parent != null,
+  "Cannot find parent %s of index %s.", idx.parent(), idx.value());
+Preconditions.checkArgument(parent.parent().isEmpty(),
+  "Parent index %s of index %s cannot be itself a child index.", 
idx.parent(), idx.value());
+  }
+}
+  }
+
+  private void checkIndex(KVIndex idx, Map indices) {
+Preconditions.checkArgument(idx.value() != null && 
!idx.value().isEmpty(),
+  "No name provided for index in type %s.", type.getName());
+Preconditions.checkArgument(
+  !idx.value().startsWith("_") || 
idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
+  "Index name %s (in type %s) is not allowed.", idx.value(), 
type.getName());
+Preconditions.checkArgument(idx.parent().isEmpty() || 
!idx.parent().equals(idx.value()),
+  "Index %s cannot be parent of itself.", idx.value());
+Preconditions.checkArgument(!indices.containsKey(idx.value()),
+  "Duplicate index %s for type %s.", idx.value(), type.getName());
+  }
+
+  public Class getType() {
+return type;
+  }
+
+  public Object getIndexValue(String indexName, Object instance) throws 
Exception {
+return getAccessor(indexName).get(instance);
+  }
+
+  public Stream indices() {
+return indices.values().stream();
+  }
+
+  Accessor getAccessor(String indexName) {
+Accessor a = accessors.get(indexName);
+Preconditions.checkArgument(a != null, "No index %s.", indexName);
+return a;
+  }
+
+  Accessor getParentAccessor(String indexName) {
+KVIndex index = indices.get(indexName);
+

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119408662
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A configurable view that allows iterating over values in a {@link 
KVStore}.
+ *
+ * 
+ * The different methods can be used to configure the behavior of the 
iterator. Calling the same
+ * method multiple times is allowed; the most recent value will be used.
+ * 
+ *
+ * 
+ * The iterators returns by this view are of type {@link KVStoreIterator}; 
they auto-close
+ * when used in a for loop that exhausts their contents, but when used 
manually, they need
+ * to be closed explicitly unless all elements are read.
+ * 
+ */
+public abstract class KVStoreView implements Iterable {
+
+  final Class type;
+
+  boolean ascending = true;
+  String index = KVIndex.NATURAL_INDEX_NAME;
+  Object first = null;
+  Object last = null;
+  Object parent = null;
+  long skip = 0L;
+  long max = Long.MAX_VALUE;
+
+  public KVStoreView(Class type) {
+this.type = type;
+  }
+
+  /**
+   * Reverses the order of iteration. By default, iterates in ascending 
order.
+   */
+  public KVStoreView reverse() {
+ascending = !ascending;
+return this;
+  }
+
+  /**
+   * Iterates according to the given index.
+   */
+  public KVStoreView index(String name) {
+this.index = Preconditions.checkNotNull(name);
+return this;
+  }
+
+  /**
+   * Defines the value of the parent index when iterating over a child 
index. Only elements that
+   * match the parent index's value will be included in the iteration.
+   *
+   * 
+   * Required for iterating over child indices, will generate an error if 
iterating over a
+   * parent-less index.
+   * 
+   */
+  public KVStoreView parent(Object value) {
+this.parent = value;
+return this;
+  }
+
+  /**
+   * Iterates starting at the given value of the chosen index.
+   */
+  public KVStoreView first(Object value) {
+this.first = value;
+return this;
+  }
+
+  /**
+   * Stops iteration at the given value of the chosen index.
--- End diff --

would be nice to clarify whether the matching element is included or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119409655
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wrapper around types managed in a KVStore, providing easy access to 
their indexed fields.
+ */
+public class KVTypeInfo {
+
+  private final Class type;
+  private final Map indices;
+  private final Map accessors;
+
+  public KVTypeInfo(Class type) throws Exception {
+this.type = type;
+this.accessors = new HashMap<>();
+this.indices = new HashMap<>();
+
+for (Field f : type.getFields()) {
+  KVIndex idx = f.getAnnotation(KVIndex.class);
+  if (idx != null) {
+checkIndex(idx, indices);
+indices.put(idx.value(), idx);
+accessors.put(idx.value(), new FieldAccessor(f));
+  }
+}
+
+for (Method m : type.getMethods()) {
+  KVIndex idx = m.getAnnotation(KVIndex.class);
+  if (idx != null) {
+checkIndex(idx, indices);
+Preconditions.checkArgument(m.getParameterTypes().length == 0,
+  "Annotated method %s::%s should not have any parameters.", 
type.getName(), m.getName());
+indices.put(idx.value(), idx);
+accessors.put(idx.value(), new MethodAccessor(m));
+  }
+}
+
+
Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
+"No natural index defined for type %s.", type.getName());
+
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
+"Natural index of %s cannot have a parent.", type.getName());
+
+for (KVIndex idx : indices.values()) {
+  if (!idx.parent().isEmpty()) {
+KVIndex parent = indices.get(idx.parent());
+Preconditions.checkArgument(parent != null,
+  "Cannot find parent %s of index %s.", idx.parent(), idx.value());
+Preconditions.checkArgument(parent.parent().isEmpty(),
+  "Parent index %s of index %s cannot be itself a child index.", 
idx.parent(), idx.value());
+  }
+}
+  }
+
+  private void checkIndex(KVIndex idx, Map indices) {
+Preconditions.checkArgument(idx.value() != null && 
!idx.value().isEmpty(),
+  "No name provided for index in type %s.", type.getName());
+Preconditions.checkArgument(
+  !idx.value().startsWith("_") || 
idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
+  "Index name %s (in type %s) is not allowed.", idx.value(), 
type.getName());
+Preconditions.checkArgument(idx.parent().isEmpty() || 
!idx.parent().equals(idx.value()),
+  "Index %s cannot be parent of itself.", idx.value());
+Preconditions.checkArgument(!indices.containsKey(idx.value()),
+  "Duplicate index %s for type %s.", idx.value(), type.getName());
+  }
+
+  public Class getType() {
+return type;
+  }
+
+  public Object getIndexValue(String indexName, Object instance) throws 
Exception {
+return getAccessor(indexName).get(instance);
+  }
+
+  public Stream indices() {
+return indices.values().stream();
+  }
+
+  Accessor getAccessor(String indexName) {
+Accessor a = accessors.get(indexName);
+Preconditions.checkArgument(a != null, "No index %s.", indexName);
+return a;
+  }
+
+  Accessor getParentAccessor(String indexName) {
+KVIndex index = indices.get(indexName);
+

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119411351
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * 
+ * There are two main features provided by the implementations of this 
interface:
+ * 
+ *
+ * Serialization
+ *
+ * 
+ * Data will be serialized to and deserialized from the underlying data 
store using a
+ * {@link KVStoreSerializer}, which can be customized by the application. 
The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for 
controlling the serialization
+ * of app-defined types.
+ * 
+ *
+ * 
+ * Data is also automatically compressed to save disk space.
+ * 
+ *
+ * Automatic Key Management
+ *
+ * 
+ * When using the built-in key management, the implementation will 
automatically create unique
+ * keys for each type written to the store. Keys are based on the type 
name, and always start
+ * with the "+" prefix character (so that it's easy to use both manual and 
automatic key
+ * management APIs without conflicts).
+ * 
+ *
+ * 
+ * Another feature of automatic key management is indexing; by annotating 
fields or methods of
+ * objects written to the store with {@link KVIndex}, indices are created 
to sort the data
+ * by the values of those properties. This makes it possible to provide 
sorting without having
+ * to load all instances of those types from the store.
+ * 
+ *
+ * 
+ * KVStore instances are thread-safe for both reads and writes.
+ * 
+ */
+public interface KVStore extends Closeable {
+
+  /**
+   * Returns app-specific metadata from the store, or null if it's not 
currently set.
+   *
+   * 
+   * The metadata type is application-specific. This is a convenience 
method so that applications
+   * don't need to define their own keys for this information.
+   * 
+   */
+   T getMetadata(Class klass) throws Exception;
+
+  /**
+   * Writes the given value in the store metadata key.
+   */
+  void setMetadata(Object value) throws Exception;
+
+  /**
+   * Read a specific instance of an object.
+   */
+   T read(Class klass, Object naturalKey) throws Exception;
--- End diff --

add that key cannot be null, and if the key does not exist, you throw a 
NoSuchElementException (otherwise I might think you'd return null)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119420304
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data 
store.
+ */
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = 
"__types__".getBytes(UTF_8);
+
+  final AtomicReference _db;
+  final KVStoreSerializer serializer;
+
+  private final ConcurrentMap typeAliases;
--- End diff --

this `typeAliases` thing is pretty confusing.  IIUC, the idea is to replace 
a long fully qualified type name with a shorter numeric id, and this holds the 
mapping?  I'd include a comment about it.  maybe even rename `typetoID`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119407982
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A configurable view that allows iterating over values in a {@link 
KVStore}.
+ *
+ * 
+ * The different methods can be used to configure the behavior of the 
iterator. Calling the same
+ * method multiple times is allowed; the most recent value will be used.
+ * 
+ *
+ * 
+ * The iterators returns by this view are of type {@link KVStoreIterator}; 
they auto-close
--- End diff --

typo: returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119412048
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data 
store.
+ */
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = 
"__types__".getBytes(UTF_8);
+
+  final AtomicReference _db;
+  final KVStoreSerializer serializer;
+
+  private final ConcurrentMap typeAliases;
+  private final ConcurrentMap types;
+
+  public LevelDB(File path) throws Exception {
+this(path, new KVStoreSerializer());
+  }
+
+  public LevelDB(File path, KVStoreSerializer serializer) throws Exception 
{
+this.serializer = serializer;
+this.types = new ConcurrentHashMap<>();
+
+Options options = new Options();
+options.createIfMissing(!path.exists());
+this._db = new AtomicReference<>(JniDBFactory.factory.open(path, 
options));
+
+byte[] versionData = db().get(STORE_VERSION_KEY);
+if (versionData != null) {
+  long version = serializer.deserializeLong(versionData);
+  if (version != STORE_VERSION) {
+throw new UnsupportedStoreVersionException();
+  }
+} else {
+  db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+}
+
+Map aliases;
+try {
+  aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+} catch (NoSuchElementException e) {
+  aliases = new HashMap<>();
+}
+typeAliases = new ConcurrentHashMap<>(aliases);
+  }
+
+  @Override
+  public  T getMetadata(Class klass) throws Exception {
+try {
+  return get(METADATA_KEY, klass);
+} catch (NoSuchElementException nsee) {
+  return null;
+}
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+if (value != null) {
+  put(METADATA_KEY, value);
+} else {
+  db().delete(METADATA_KEY);
+}
+  }
+
+   T get(byte[] key, Class klass) throws Exception {
+byte[] data = db().get(key);
+if (data == null) {
+  throw new NoSuchElementException(new String(key, UTF_8));
+}
+return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+Preconditions.checkArgument(value != null, "Null values are not 
allowed.");
+db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public  T read(Class klass, Object naturalKey) throws 

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r118762497
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.commons.io.FileUtils;
+import org.iq80.leveldb.DBIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBSuite {
+
+  private LevelDB db;
+  private File dbpath;
+
+  @After
+  public void cleanup() throws Exception {
+if (db != null) {
+  db.close();
+}
+if (dbpath != null) {
+  FileUtils.deleteQuietly(dbpath);
+}
+  }
+
+  @Before
+  public void setup() throws Exception {
+dbpath = File.createTempFile("test.", ".ldb");
+dbpath.delete();
+db = new LevelDB(dbpath);
+  }
+
+  @Test
+  public void testReopenAndVersionCheckDb() throws Exception {
+db.close();
+db = null;
+assertTrue(dbpath.exists());
+
+db = new LevelDB(dbpath);
+assertEquals(LevelDB.STORE_VERSION,
+  
db.serializer.deserializeLong(db.db().get(LevelDB.STORE_VERSION_KEY)));
+db.db().put(LevelDB.STORE_VERSION_KEY, 
db.serializer.serialize(LevelDB.STORE_VERSION + 1));
--- End diff --

Because the test is explicitly testing a mismatch in the version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r118762391
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java ---
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List allEntries;
+  private static List clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator {
+/**
+ * Returns a comparator that falls back to natural order if this 
comparator's ordering
+ * returns equality for two elements. Used to mimic how the index 
sorts things internally.
+ */
+default BaseComparator fallback() {
+  return (t1, t2) -> {
+int result = BaseComparator.this.compare(t1, t2);
+if (result != 0) {
+  return result;
+}
+
+return t1.key.compareTo(t2.key);
+  };
+}
+
+/** Reverses the order of this comparator. */
+default BaseComparator reverse() {
+  return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+}
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> 
t1.num - t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, 
before all tests are
+   * run. Any state can be safely stored in static variables and cleaned 
up in a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+allEntries = null;
+db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+if (db != null) {
+  return;
+}
+
+db = createStore();
+
+int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
+
+// Instead of generating sequential IDs, generate random unique IDs to 
avoid the insertion
+// order matching the natural ordering. Just in case.
+boolean[] usedIDs = new boolean[count];
+
+allEntries = new ArrayList<>(count);
+for (int i = 0; i < count; i++) {
+  CustomType1 t = new CustomType1();
+
+  int id;
+  do {
+id = RND.nextInt(count);
+  } while (usedIDs[id]);
+
+  usedIDs[id] = true;
+  t.key = "key" + id;
+  t.id = "id" + i;
+  t.name = "name" + RND.nextInt(MAX_ENTRIES);
+  t.num = RND.nextInt(MAX_ENTRIES);
+  t.child = "child" + (i % MIN_ENTRIES);
+  allEntries.add(t);
+  db.write(t);
+}
+
+// Pick the first generated value, and forcefully create a few entries 
that will clash
+// with the indexed 

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-26 Thread jsoltren
Github user jsoltren commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r118757899
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBSuite.java ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.commons.io.FileUtils;
+import org.iq80.leveldb.DBIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBSuite {
+
+  private LevelDB db;
+  private File dbpath;
+
+  @After
+  public void cleanup() throws Exception {
+if (db != null) {
+  db.close();
+}
+if (dbpath != null) {
+  FileUtils.deleteQuietly(dbpath);
+}
+  }
+
+  @Before
+  public void setup() throws Exception {
+dbpath = File.createTempFile("test.", ".ldb");
+dbpath.delete();
+db = new LevelDB(dbpath);
+  }
+
+  @Test
+  public void testReopenAndVersionCheckDb() throws Exception {
+db.close();
+db = null;
+assertTrue(dbpath.exists());
+
+db = new LevelDB(dbpath);
+assertEquals(LevelDB.STORE_VERSION,
+  
db.serializer.deserializeLong(db.db().get(LevelDB.STORE_VERSION_KEY)));
+db.db().put(LevelDB.STORE_VERSION_KEY, 
db.serializer.serialize(LevelDB.STORE_VERSION + 1));
--- End diff --

Why the + 1 here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-26 Thread jsoltren
Github user jsoltren commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r118757763
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java ---
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List allEntries;
+  private static List clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator {
+/**
+ * Returns a comparator that falls back to natural order if this 
comparator's ordering
+ * returns equality for two elements. Used to mimic how the index 
sorts things internally.
+ */
+default BaseComparator fallback() {
+  return (t1, t2) -> {
+int result = BaseComparator.this.compare(t1, t2);
+if (result != 0) {
+  return result;
+}
+
+return t1.key.compareTo(t2.key);
+  };
+}
+
+/** Reverses the order of this comparator. */
+default BaseComparator reverse() {
+  return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+}
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> 
t1.num - t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, 
before all tests are
+   * run. Any state can be safely stored in static variables and cleaned 
up in a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+allEntries = null;
+db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+if (db != null) {
+  return;
+}
+
+db = createStore();
+
+int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
+
+// Instead of generating sequential IDs, generate random unique IDs to 
avoid the insertion
+// order matching the natural ordering. Just in case.
+boolean[] usedIDs = new boolean[count];
+
+allEntries = new ArrayList<>(count);
+for (int i = 0; i < count; i++) {
+  CustomType1 t = new CustomType1();
+
+  int id;
+  do {
+id = RND.nextInt(count);
+  } while (usedIDs[id]);
+
+  usedIDs[id] = true;
+  t.key = "key" + id;
+  t.id = "id" + i;
+  t.name = "name" + RND.nextInt(MAX_ENTRIES);
+  t.num = RND.nextInt(MAX_ENTRIES);
+  t.child = "child" + (i % MIN_ENTRIES);
+  allEntries.add(t);
+  db.write(t);
+}
+
+// Pick the first generated value, and forcefully create a few entries 
that will clash
+// with the 

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-08 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/17902

[SPARK-20641][core] Add key-value store abstraction and LevelDB 
implementation.

This change adds an abstraction and LevelDB implementation for a key-value
store that will be used to store UI and SHS data.

The interface is described in KVStore.java (see javadoc). Specifics
of the LevelDB implementation are discussed in the javadocs of both
LevelDB.java and LevelDBTypeInfo.java.

Included also are a few small benchmarks just to get some idea of
latency. Because they're too slow for regular unit test runs, they're
disabled by default.

Tested with the include unit tests, and also as part of the overall feature
implementation (including running SHS with hundreds of apps).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark shs-ng/M1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17902.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17902


commit f3b7e0bb9c141058fdbcf202a4b8a47a25237613
Author: Marcelo Vanzin 
Date:   2016-10-03T19:09:18Z

SHS-NG M1: Add KVStore abstraction, LevelDB implementation.

The interface is described in KVIndex.java (see javadoc). Specifics
of the LevelDB implementation are discussed in the javadocs of both
LevelDB.java and LevelDBTypeInfo.java.

Included also are a few small benchmarks just to get some idea of
latency. Because they're too slow for regular unit test runs, they're
disabled by default.

commit 52ed2b45c09e7104e4fef5adcf78025f53b7a8e0
Author: Marcelo Vanzin 
Date:   2016-11-01T18:34:25Z

SHS-NG M1: Add support for arrays when indexing.

This is needed because some UI types have compound keys.

commit 4112afe723f85412035ad3a9c4801b583e74f876
Author: Marcelo Vanzin 
Date:   2016-11-03T22:18:24Z

SHS-NG M1: Fix counts in LevelDB when updating entries.

Also add unit test. When updating, the code needs to keep track of
the aggregated delta to be added to each count stored in the db,
instead of reading the count from the db for each update.

commit 718cabd098dd6a534e7952066cd43f89f6875a14
Author: Marcelo Vanzin 
Date:   2017-03-18T03:17:04Z

SHS-NG M1: Try to prevent db use after close.

This causes JVM crashes in the leveldb library, so try to avoid it;
if there are still issues, we'll neeed locking.

commit 45a027fd5e32421b57846236180d6012ee72e69b
Author: Marcelo Vanzin 
Date:   2017-03-24T20:19:07Z

SHS-NG M1: Use Java 8 lambdas.

Also rename LevelDBIteratorSuite to work around some super weird
issue with sbt.

commit e592bf69b94c3308d194c2cb678be133931b95b5
Author: Marcelo Vanzin 
Date:   2017-03-25T00:24:08Z

SHS-NG M1: Compress values stored in LevelDB.

LevelDB has built-in support for snappy compression, but it seems
to be buggy in the leveldb-jni library; the compression threads
don't seem to run by default, and when you enable them, there are
weird issues when stopping the DB.

So just do compression manually using the JRE libraries; it's probably
a little slower but it saves a good chunk of disk space.

commit 889963f2ffbcb628f9e53e7142fd37931ba09a54
Author: Marcelo Vanzin 
Date:   2017-03-25T01:24:58Z

SHS-NG M1: Use type aliases as keys in Level DB.

The type name gets repeated a lot in the store, so using it as the prefix
for every key causes disk usage to grow unnecessarily. Instead, create a
short alias for the type and keep a mapping of aliases to known types in
a map in memory; the map is also saved to the database so it can be read
later.

commit 84ab160699ef8dad4df1fa4cbba29deec7c92c06
Author: Marcelo Vanzin 
Date:   2017-04-03T18:35:50Z

SHS-NG M1: Separate index introspection from storage.

The new KVTypeInfo class can help with writing different implementations
of KVStore without duplicating logic from LevelDBTypeInfo.

commit 7b870212e80e70b8c3f3eb4279e3bb9ec0125d2d
Author: Marcelo Vanzin 
Date:   2017-04-26T18:54:33Z

SHS-NG M1: Remove unused methods from KVStore.

Turns out I ended up not using the raw storage methods in KVStore, so
this change removes them to simplify the API and save some code.

commit 5197c218525db2ad849dfe77d83dddf2311bb5ad
Author: Marcelo Vanzin 
Date:   2017-05-05T21:36:00Z

SHS-NG M1: Add "max" and "last" to kvstore iterators.

This makes it easier for callers to control the end of iteration,
making it easier to write Scala code