SammyVimes commented on a change in pull request #105:
URL: https://github.com/apache/ignite-3/pull/105#discussion_r619136915
##########
File path:
modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
##########
@@ -17,18 +17,80 @@
package org.apache.ignite.internal.vault;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
/**
- * VaultManager is responsible for handling VaultService lifecycle and
providing interface for managing local keys.
+ * VaultManager is responsible for handling {@link VaultService} lifecycle
+ * and providing interface for managing local keys.
*/
public class VaultManager {
+ private VaultService vaultService;
Review comment:
JavaDoc is absent here
##########
File path: modules/vault/pom.xml
##########
@@ -33,4 +33,24 @@
<artifactId>ignite-vault</artifactId>
<version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
Review comment:
```suggestion
<version>${project.version}</version>
```
##########
File path:
modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultWatch.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.Comparator;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Watch for vault entries.
+ * Could be specified by range of keys.
+ * If value of key in range is changed, then corresponding listener will be
triggered.
+ */
+public class VaultWatch {
+ /** Comparator for {@code ByteArray} values. */
+ private static final Comparator<ByteArray> CMP = ByteArray::compare;
+
+ /** Start key of range (inclusive) */
+ @Nullable
+ private ByteArray startKey;
+
+ /** End key of range (inclusive) */
+ @Nullable
+ private ByteArray endKey;
+
+ /** Listener for vault's values updates. */
+ private VaultListener listener;
+
+ /**
+ * @param listener Listener.
+ */
+ public VaultWatch(VaultListener listener) {
+ this.listener = listener;
+ }
+
+ /**
+ * Start key of range (inclusive).
+ * If value of key in range is changed, then corresponding listener will
be triggered.
+ *
+ * @param startKey Start key represented as {@code ByteArray}.
+ */
+ public void startKey(ByteArray startKey) {
+ this.startKey = startKey;
+ }
+
+ /**
+ * Start key of range (inclusive).
+ * If value of key in range is changed, then corresponding listener will
be triggered.
+ *
+ * @param endKey End key represented as {@code ByteArray}.
+ */
+ public void endKey(ByteArray endKey) {
+ this.endKey = endKey;
+ }
+
+ /**
+ * Notifies specified listener if {@code val} of key in range was changed.
+ *
+ * @param val Vault entry.
+ */
+ public void notify(VaultEntry val) {
+ if (startKey != null && CMP.compare(val.key(), startKey) < 0)
Review comment:
I wonder if comparator works well with little endiannes
##########
File path:
modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of vault {@link Watcher}.
+ */
+public class WatcherImpl implements Watcher {
+ /** Queue for changed vault entries. */
+ private final BlockingQueue<VaultEntry> queue = new
LinkedBlockingQueue<>();
+
+ /** Registered vault watches. */
+ private final Map<IgniteUuid, VaultWatch> watches = new HashMap<>();
Review comment:
Maybe ConcurrentHashMap instead of HashMap with lock? Not sure, though
##########
File path:
modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test for base vault contracts.
+ */
+public class VaultBaseContractsTest {
+ /** Vault. */
+ private VaultService storage;
+
+ /**
+ * Instantiate vault.
+ */
+ @BeforeEach
+ public void setUp() {
+ storage = new VaultServiceImpl();
+ }
+
+ /**
+ * put contract
+ */
+ @Test
+ public void put() throws ExecutionException, InterruptedException {
+ ByteArray key = getKey(1);
+ byte[] val = getValue(key, 1);
+
+ assertNull(storage.get(key).get().value());
+
+ storage.put(key, val);
+
+ VaultEntry v = storage.get(key).get();
+
+ assertFalse(v.empty());
+ assertEquals(val, v.value());
+
+ storage.put(key, val);
+
+ v = storage.get(key).get();
+
+ assertFalse(v.empty());
+ assertEquals(val, v.value());
+ }
+
+ /**
+ * remove contract.
+ */
+ @Test
+ public void remove() throws ExecutionException, InterruptedException {
+ ByteArray key = getKey(1);
+ byte[] val = getValue(key, 1);
+
+ assertNull(storage.get(key).get().value());
+
+ // Remove non-existent value.
+ storage.remove(key);
+
+ assertNull(storage.get(key).get().value());
+
+ storage.put(key, val);
+
+ VaultEntry v = storage.get(key).get();
+
+ assertFalse(v.empty());
+ assertEquals(val, v.value());
+
+ // Remove existent value.
+ storage.remove(key);
+
+ v = storage.get(key).get();
+
+ assertNull(v.value());
+ }
+
+ /**
+ * range contract.
+ */
+ @Test
+ public void range() throws ExecutionException, InterruptedException {
+ ByteArray key;
+
+ Map<ByteArray, byte[]> values = new HashMap<>();
+
+ for (int i = 0; i < 10; i++) {
+ key = getKey(i);
+
+ values.put(key, getValue(key, i));
+
+ assertNull(storage.get(key).get().value());
+ }
+
+ values.forEach((k, v) -> storage.put(k, v));
+
+ for (Map.Entry<ByteArray, byte[]> entry : values.entrySet())
+ assertEquals(entry.getValue(),
storage.get(entry.getKey()).get().value());
+
+ Iterator<VaultEntry> it = storage.range(getKey(3), getKey(7));
+
+ List<VaultEntry> rangeRes = new ArrayList<>();
+
+ it.forEachRemaining(rangeRes::add);
+
+ assertEquals(4, rangeRes.size());
+
+ //Check that we have exact range from "key3" to "key6"
+ for (int i = 3; i < 7; i++)
+ assertEquals(values.get(getKey(i)), rangeRes.get(i - 3).value());
+ }
+
+ /**
+ * watch contract.
+ */
+ @Test
+ public void watch() throws ExecutionException, InterruptedException {
+ ByteArray key;
+
+ Map<ByteArray, byte[]> values = new HashMap<>();
+
+ for (int i = 0; i < 10; i++) {
+ key = getKey(i);
+
+ values.put(key, getValue(key, i));
+ }
+
+ values.forEach((k, v) -> storage.put(k, v));
+
+ for (Map.Entry<ByteArray, byte[]> entry : values.entrySet())
+ assertEquals(entry.getValue(),
storage.get(entry.getKey()).get().value());
+
+ AtomicInteger counter = new AtomicInteger();
+
+ VaultWatch vaultWatch = new VaultWatch(changedValue ->
counter.incrementAndGet());
+
+ vaultWatch.startKey(getKey(3));
+ vaultWatch.endKey(getKey(7));
+
+ storage.watch(vaultWatch);
+
+ for (int i = 3; i < 7; i++)
+ storage.put(getKey(i), ("new" + i).getBytes());
+
+ Thread.sleep(500);
Review comment:
Maybe replace counter and sleep with CountDownLatch?
##########
File path:
modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of vault {@link Watcher}.
+ */
+public class WatcherImpl implements Watcher {
+ /** Queue for changed vault entries. */
+ private final BlockingQueue<VaultEntry> queue = new
LinkedBlockingQueue<>();
+
+ /** Registered vault watches. */
+ private final Map<IgniteUuid, VaultWatch> watches = new HashMap<>();
+
+ /** Flag for indicating if watcher is stopped. */
+ private volatile boolean stop;
+
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ /** Execution service which runs thread for processing changed vault
entries. */
+ private final ExecutorService exec;
+
+ /**
+ * Default constructor.
+ */
+ public WatcherImpl() {
+ exec = Executors.newFixedThreadPool(1);
+
+ exec.execute(new WatcherWorker());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<IgniteUuid> register(@NotNull
VaultWatch vaultWatch) {
+ synchronized (mux) {
+ IgniteUuid key = new IgniteUuid(UUID.randomUUID(), 0);
+
+ watches.put(key, vaultWatch);
+
+ return CompletableFuture.completedFuture(key);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void notify(@NotNull VaultEntry val) {
+ queue.offer(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(@NotNull IgniteUuid uuid) {
+ synchronized (mux) {
+ watches.remove(uuid);
+ }
+ }
+
+ /**
+ * Shutdowns watcher.
+ */
+ public void shutdown() {
+ stop = true;
+
+ if (exec != null) {
+ List<Runnable> tasks = exec.shutdownNow();
+
+ if (!tasks.isEmpty())
+ System.out.println("Runnable tasks outlived thread pool
executor service");
Review comment:
System out instead of logger
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]