sk0x50 commented on a change in pull request #105: URL: https://github.com/apache/ignite-3/pull/105#discussion_r619195970
########## File path: modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.jetbrains.annotations.NotNull; + +/** + * A class for handling byte array. + */ +public final class ByteArray implements Comparable<ByteArray> { + /** Byte-wise representation of the {@code ByteArray}. */ + @NotNull + private final byte[] arr; + + /** + * Constructs {@code ByteArray} instance from the given byte array. <em>Note:</em> copy of the given byte array will not be + * created in order to avoid redundant memory consumption. + * + * @param arr Byte array. Can't be {@code null}. + */ + public ByteArray(@NotNull byte[] arr) { + this.arr = arr; + } + + /** + * Constructs {@code ByteArray} instance from the given string. + * + * @param s The string {@code ByteArray} representation. Can't be {@code null}. + */ + public static ByteArray fromString(@NotNull String s) { + return new ByteArray(s.getBytes(StandardCharsets.UTF_8)); + } + + /** + * Returns the {@code ByteArray} as byte array. + * + * @return Bytes of the {@code ByteArray}. + */ + public byte[] bytes() { + return arr; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + + if (o == null || getClass() != o.getClass()) return false; + + ByteArray byteArray = (ByteArray)o; + + return Arrays.equals(arr, byteArray.arr); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Arrays.hashCode(arr); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull ByteArray other) { + return Arrays.compare(this.arr, other.arr); + } + + /** + * Compares two {@code ByteArray} values. + * The value returned is identical to what would be returned by: + * <pre> + * x.compareTo(y) + * </pre> + * + * where x and y are {@code ByteArray}'s + */ + public static int compare(ByteArray x, ByteArray y) { Review comment: Does it make sense to add `@NotNull` annotations? ########## File path: modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.jetbrains.annotations.NotNull; + +/** + * A class for handling byte array. + */ +public final class ByteArray implements Comparable<ByteArray> { + /** Byte-wise representation of the {@code ByteArray}. */ + @NotNull + private final byte[] arr; + + /** + * Constructs {@code ByteArray} instance from the given byte array. <em>Note:</em> copy of the given byte array will not be + * created in order to avoid redundant memory consumption. + * + * @param arr Byte array. Can't be {@code null}. + */ + public ByteArray(@NotNull byte[] arr) { + this.arr = arr; + } + + /** + * Constructs {@code ByteArray} instance from the given string. Review comment: Perhaps, it would be nice to mention that the `UTF_8` charset is used in order to create an internal byte array. WDYT? ########## File path: modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.vault.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.vault.common.VaultEntry; +import org.apache.ignite.internal.vault.common.VaultWatch; +import org.apache.ignite.internal.vault.service.VaultService; +import org.apache.ignite.lang.ByteArray; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** + * Test for base vault contracts. + */ +public class VaultBaseContractsTest { + /** Vault. */ + private VaultService storage; + + /** + * Instantiate vault. + */ + @BeforeEach + public void setUp() { + storage = new VaultServiceImpl(); + } + + /** + * put contract + */ + @Test + public void put() throws ExecutionException, InterruptedException { + ByteArray key = getKey(1); + byte[] val = getValue(key, 1); + + assertNull(storage.get(key).get().value()); + + storage.put(key, val); + + VaultEntry v = storage.get(key).get(); + + assertFalse(v.empty()); + assertEquals(val, v.value()); + + storage.put(key, val); + + v = storage.get(key).get(); + + assertFalse(v.empty()); + assertEquals(val, v.value()); + } + + /** + * remove contract. + */ + @Test + public void remove() throws ExecutionException, InterruptedException { + ByteArray key = getKey(1); + byte[] val = getValue(key, 1); + + assertNull(storage.get(key).get().value()); + + // Remove non-existent value. + storage.remove(key); + + assertNull(storage.get(key).get().value()); + + storage.put(key, val); + + VaultEntry v = storage.get(key).get(); + + assertFalse(v.empty()); + assertEquals(val, v.value()); + + // Remove existent value. + storage.remove(key); + + v = storage.get(key).get(); + + assertNull(v.value()); + } + + /** + * range contract. + */ + @Test + public void range() throws ExecutionException, InterruptedException { + ByteArray key; + + Map<ByteArray, byte[]> values = new HashMap<>(); + + for (int i = 0; i < 10; i++) { + key = getKey(i); + + values.put(key, getValue(key, i)); + + assertNull(storage.get(key).get().value()); + } + + values.forEach((k, v) -> storage.put(k, v)); + + for (Map.Entry<ByteArray, byte[]> entry : values.entrySet()) + assertEquals(entry.getValue(), storage.get(entry.getKey()).get().value()); + + Iterator<VaultEntry> it = storage.range(getKey(3), getKey(7)); + + List<VaultEntry> rangeRes = new ArrayList<>(); + + it.forEachRemaining(rangeRes::add); + + assertEquals(4, rangeRes.size()); + + //Check that we have exact range from "key3" to "key6" + for (int i = 3; i < 7; i++) + assertEquals(values.get(getKey(i)), rangeRes.get(i - 3).value()); + } + + /** + * watch contract. + */ + @Test + public void watch() throws ExecutionException, InterruptedException { + ByteArray key; + + Map<ByteArray, byte[]> values = new HashMap<>(); + + for (int i = 0; i < 10; i++) { + key = getKey(i); + + values.put(key, getValue(key, i)); + } + + values.forEach((k, v) -> storage.put(k, v)); + + for (Map.Entry<ByteArray, byte[]> entry : values.entrySet()) + assertEquals(entry.getValue(), storage.get(entry.getKey()).get().value()); + + AtomicInteger counter = new AtomicInteger(); + + VaultWatch vaultWatch = new VaultWatch(changedValue -> counter.incrementAndGet()); + + vaultWatch.startKey(getKey(3)); + vaultWatch.endKey(getKey(7)); + + storage.watch(vaultWatch); + + for (int i = 3; i < 7; i++) + storage.put(getKey(i), ("new" + i).getBytes()); + + Thread.sleep(500); Review comment: +1 I think we should avoid `Thread.sleep()` in tests. ########## File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.vault.common; + +import java.io.Serializable; +import org.apache.ignite.lang.ByteArray; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Representation of vault entry. + */ +public class VaultEntry implements Entry, Serializable { + /** Key. */ + private ByteArray key; + + /** Value. */ + private byte[] val; + + /** + * Constructs {@code VaultEntry} instance from the given key and value. + * + * @param key Key as a {@code ByteArray}. + * @param val Value as a {@code byte[]}. + */ + public VaultEntry(ByteArray key, byte[] val) { Review comment: It seems that `key` must not be `null` (in accordance with the `key()` method). It would be nice to note this fact in Javadoc and/or add annotations. ########## File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.vault.common; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of vault {@link Watcher}. + */ +public class WatcherImpl implements Watcher { + /** Queue for changed vault entries. */ + private final BlockingQueue<VaultEntry> queue = new LinkedBlockingQueue<>(); + + /** Registered vault watches. */ + private final Map<IgniteUuid, VaultWatch> watches = new HashMap<>(); + + /** Flag for indicating if watcher is stopped. */ + private volatile boolean stop; + + /** Mutex. */ + private final Object mux = new Object(); + + /** Execution service which runs thread for processing changed vault entries. */ + private final ExecutorService exec; + + /** + * Default constructor. + */ + public WatcherImpl() { + exec = Executors.newFixedThreadPool(1); + + exec.execute(new WatcherWorker()); + } + + /** {@inheritDoc} */ + @Override public CompletableFuture<IgniteUuid> register(@NotNull VaultWatch vaultWatch) { + synchronized (mux) { + IgniteUuid key = new IgniteUuid(UUID.randomUUID(), 0); + + watches.put(key, vaultWatch); + + return CompletableFuture.completedFuture(key); + } + } + + /** {@inheritDoc} */ + @Override public void notify(@NotNull VaultEntry val) { Review comment: Should we notify a user in some way if the worker is already stopped? ########## File path: modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.vault.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.vault.common.*; +import org.apache.ignite.internal.vault.service.VaultService; +import org.apache.ignite.lang.ByteArray; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.NotNull; + +/** + * Simple in-memory representation of vault. Only for test purposes. + */ +public class VaultServiceImpl implements VaultService { + /** Map to store values. */ + private TreeMap<ByteArray, byte[]> storage = new TreeMap<>(); + + /** + * Special key for vault where applied revision for {@code putAll} operation is stored. Review comment: `/** Special key for vault where applied revision for {@code putAll} operation is stored. */` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
