rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1178781093
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
long REBALANCE_IN_PROGRESS = -1;
/**
- * Closure for executing write operations on the storage.
+ * Closure for executing write operations on the storage. All write
operations, such as
+ * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link
#commitWrite(RowId, HybridTimestamp)},
+ * as well as {@link #scanVersions(RowId)}, and operations like {@link
#committedGroupConfiguration(byte[])}, must be executed inside
+ * of the write closure. Also, each operation that involves modifying rows
(and {@link #scanVersions(RowId)} must hold lock on
Review Comment:
```suggestion
* of the write closure. Also, each operation that involves modifying
rows (and {@link #scanVersions(RowId)}) must hold lock on
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
long REBALANCE_IN_PROGRESS = -1;
/**
- * Closure for executing write operations on the storage.
+ * Closure for executing write operations on the storage. All write
operations, such as
+ * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link
#commitWrite(RowId, HybridTimestamp)},
+ * as well as {@link #scanVersions(RowId)}, and operations like {@link
#committedGroupConfiguration(byte[])}, must be executed inside
+ * of the write closure. Also, each operation that involves modifying rows
(and {@link #scanVersions(RowId)} must hold lock on
+ * corresponding row ID, by either calling {@link Locker#lock(RowId)} or
calling {@link Locker#tryLock(RowId)} and checking the
Review Comment:
```suggestion
* the corresponding row ID, by either calling {@link
Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -217,6 +245,25 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
*/
@Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
+ /**
+ * Returns the head of GC queue.
+ *
+ * @param lowWatermark Upper bound for commit timestamp of GC entry.
Review Comment:
Is it an inclusive or exclusive bound?
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
long REBALANCE_IN_PROGRESS = -1;
/**
- * Closure for executing write operations on the storage.
+ * Closure for executing write operations on the storage. All write
operations, such as
+ * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link
#commitWrite(RowId, HybridTimestamp)},
+ * as well as {@link #scanVersions(RowId)}, and operations like {@link
#committedGroupConfiguration(byte[])}, must be executed inside
+ * of the write closure. Also, each operation that involves modifying rows
(and {@link #scanVersions(RowId)} must hold lock on
+ * corresponding row ID, by either calling {@link Locker#lock(RowId)} or
calling {@link Locker#tryLock(RowId)} and checking the
+ * result.
*
* @param <V> Type of the result returned from the closure.
*/
@SuppressWarnings("PublicInnerClass")
@FunctionalInterface
interface WriteClosure<V> {
- V execute() throws StorageException;
+ V execute(Locker locker) throws StorageException;
+ }
+
+ /**
+ * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock
row IDs before updating the data. All acquired locks are
+ * released automatically after {@code execute} call is completed.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ interface Locker {
+ /**
+ * Locks passed row ID until the {@link WriteClosure#execute(Locker)}
is completed.
+ *
+ * @param rowId Row ID to lock.
+ */
+ void lock(RowId rowId);
+
+ /**
+ * Tries to lock passed row ID. If successful, lock will be released
when the {@link WriteClosure#execute(Locker)} is completed.
+ *
+ * @param rowId Row ID to lock.
+ * @return {@code true} if row ID has been locked successfully, or the
lock has already been held by current thread.
Review Comment:
```suggestion
* @return {@code true} if row ID has been locked successfully,
{@code false} if the lock has already been held by current thread.
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java:
##########
@@ -36,7 +35,7 @@
class LockHolder<T extends Lock> {
private final T lock;
- private final AtomicInteger lockHolder = new AtomicInteger();
+ private int lockHoldersCount;
Review Comment:
Why is non-volatile field is ok now?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -153,69 +154,81 @@ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch,
RowId rowId, HybridTimes
/**
* Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
*
- * @param batch Write batch.
* @param lowWatermark Low watermark.
- * @return Garbage collected element.
+ * @return Garbage collected element descriptor.
* @throws RocksDBException If failed to collect the garbage.
*/
- @Nullable BinaryRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
- ColumnFamilyHandle partCf = helper.partCf;
-
+ @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
// We retrieve the first element of the GC queue and seek for it in
the data CF.
// However, the element that we need to garbage collect is the next
(older one) element.
// First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
// If the next element exists, that should be the element that we want
to garbage collect.
- try (RocksIterator gcIt = db.newIterator(gcQueueCf,
helper.upperBoundReadOpts)) {
+ try (
+ RocksIterator foo = db.newIterator(gcQueueCf,
helper.upperBoundReadOpts);
Review Comment:
`foo` doesn't seem a good name, how about `bareIt`?
##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockByRowIdTest.java:
##########
@@ -25,135 +25,97 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
import org.apache.ignite.internal.storage.RowId;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
- * Class for testing {@link ReentrantLockByRowId}.
+ * Class for testing {@link LockByRowId}.
*/
-public class ReentrantLockByRowIdTest {
- private ReentrantLockByRowId lockByRowId;
+public class LockByRowIdTest {
+ private LockByRowId lockByRowId;
@BeforeEach
void setUp() {
- lockByRowId = new ReentrantLockByRowId();
- }
-
- @AfterEach
- void tearDown() {
- lockByRowId.releaseAllLockByCurrentThread();
+ lockByRowId = new LockByRowId();
}
@Test
void testSimple() {
RowId rowId = new RowId(0);
- lockByRowId.acquireLock(rowId);
- lockByRowId.releaseLock(rowId);
+ lockByRowId.lock(rowId);
+ lockByRowId.unlockAll(rowId);
- assertEquals(1, lockByRowId.inLock(rowId, () -> 1));
+ assertEquals(1, inLock(rowId, () -> 1));
}
@Test
void testSimpleReEnter() {
RowId rowId = new RowId(0);
- lockByRowId.acquireLock(rowId);
- lockByRowId.acquireLock(rowId);
-
- lockByRowId.inLock(rowId, () -> {
- lockByRowId.acquireLock(rowId);
-
- lockByRowId.releaseLock(rowId);
-
- return null;
- });
+ lockByRowId.lock(rowId);
+ lockByRowId.lock(rowId);
- lockByRowId.releaseLock(rowId);
- lockByRowId.releaseLock(rowId);
+ lockByRowId.unlockAll(rowId);
}
@Test
void testReleaseError() {
- assertThrows(IllegalStateException.class, () ->
lockByRowId.releaseLock(new RowId(0)));
+ assertThrows(IllegalStateException.class, () ->
lockByRowId.unlockAll(new RowId(0)));
RowId rowId = new RowId(0);
- assertThat(runAsync(() -> lockByRowId.acquireLock(rowId)),
willCompleteSuccessfully());
+ assertThat(runAsync(() -> lockByRowId.lock(rowId)),
willCompleteSuccessfully());
- assertThrows(IllegalMonitorStateException.class, () ->
lockByRowId.releaseLock(rowId));
+ assertThrows(IllegalMonitorStateException.class, () ->
lockByRowId.unlockAll(rowId));
}
@Test
void testBlockSimple() {
RowId rowId = new RowId(0);
- lockByRowId.acquireLock(rowId);
- lockByRowId.acquireLock(rowId);
+ lockByRowId.lock(rowId);
+ lockByRowId.lock(rowId);
CompletableFuture<?> acquireLockFuture = runAsync(() -> {
- lockByRowId.acquireLock(rowId);
- lockByRowId.releaseLock(rowId);
+ lockByRowId.lock(rowId);
+ lockByRowId.unlockAll(rowId);
});
assertThat(acquireLockFuture, willTimeoutFast());
- lockByRowId.releaseLock(rowId);
-
- assertThat(acquireLockFuture, willTimeoutFast());
-
- lockByRowId.releaseLock(rowId);
+ lockByRowId.unlockAll(rowId);
assertThat(acquireLockFuture, willCompleteSuccessfully());
-
- lockByRowId.acquireLock(rowId);
}
@Test
void testBlockSupplier() {
Review Comment:
Why is this test needed? Now, after `inLock()` has been removed from the
tested class, it seems to test same logic as the preceding test.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -189,6 +198,8 @@ public void handleUpdateAll(
RowId rowId = new RowId(partitionId, entry.getKey());
BinaryRow row = entry.getValue() != null ? new
ByteBufferRow(entry.getValue()) : null;
+ locker.lock(rowId);
Review Comment:
Shouldn't we explicitly sort the row IDs before taking locks on them?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -451,7 +454,10 @@ void handleBuildIndexCommand(BuildIndexCommand cmd, long
commandIndex, long comm
return;
}
- storage.runConsistently(() -> {
+ storage.runConsistently(locker -> {
+ //TODO Assert order?
Review Comment:
Same as above (about TODO and ordering)
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockByRowId.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.storage.RowId;
+
+/**
+ * {@link ReentrantLock} by row ID.
+ *
+ * <p>Allows synchronization of version chain update operations.
+ */
+public class LockByRowId {
+ private final ConcurrentMap<RowId, LockHolder<ReentrantLock>>
lockHolderByRowId = new ConcurrentHashMap<>();
+
+ /**
+ * Acquires the lock by row ID.
+ *
+ * @param rowId Row ID.
+ */
+ public void lock(RowId rowId) {
+ LockHolder<ReentrantLock> lockHolder =
lockHolderByRowId.compute(rowId, (id, holder) -> {
+ if (holder == null) {
+ holder = new LockHolder<>(new ReentrantLock());
+ }
+
+ if (!holder.getLock().isHeldByCurrentThread()) {
+ holder.incrementHolders();
+ }
+
+ return holder;
+ });
+
+ if (!lockHolder.getLock().isHeldByCurrentThread()) {
+ lockHolder.getLock().lock();
+ }
+ }
+
+ /**
+ * Tries to acquire the lock by row ID.
+ *
+ * @param rowId Row ID.
+ * @return {@code true} if lock has been acquired successfully.
+ */
+ public boolean tryLock(RowId rowId) {
+ boolean[] result = {false};
+
+ lockHolderByRowId.compute(rowId, (id, holder) -> {
+ if (holder == null) {
+ holder = new LockHolder<>(new ReentrantLock());
+
+ holder.incrementHolders();
+
+ // Locks immediately, because no one else have seen this
instance yet.
Review Comment:
```suggestion
// Locks immediately, because no one else has seen this
instance yet.
```
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -853,24 +859,22 @@ protected ReadResult decodeEntry(byte[] key, byte[]
value) {
@Override
public boolean hasNext() {
- return busy(() -> {
-
throwExceptionIfStorageInProgressOfRebalance(state.get(),
RocksDbMvPartitionStorage.this::createStorageInfo);
+ assert rowIsLocked(rowId);
- return super.hasNext();
- });
+ return super.hasNext();
}
@Override
public ReadResult next() {
- return busy(() -> {
-
throwExceptionIfStorageInProgressOfRebalance(state.get(),
RocksDbMvPartitionStorage.this::createStorageInfo);
+ assert rowIsLocked(rowId);
- return super.next();
- });
+ return super.next();
}
@Override
public void close() {
+ assert rowIsLocked(rowId);
Review Comment:
Do we really need to assert this on close?
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
long REBALANCE_IN_PROGRESS = -1;
/**
- * Closure for executing write operations on the storage.
+ * Closure for executing write operations on the storage. All write
operations, such as
+ * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link
#commitWrite(RowId, HybridTimestamp)},
+ * as well as {@link #scanVersions(RowId)}, and operations like {@link
#committedGroupConfiguration(byte[])}, must be executed inside
+ * of the write closure. Also, each operation that involves modifying rows
(and {@link #scanVersions(RowId)} must hold lock on
+ * corresponding row ID, by either calling {@link Locker#lock(RowId)} or
calling {@link Locker#tryLock(RowId)} and checking the
+ * result.
*
* @param <V> Type of the result returned from the closure.
*/
@SuppressWarnings("PublicInnerClass")
@FunctionalInterface
interface WriteClosure<V> {
- V execute() throws StorageException;
+ V execute(Locker locker) throws StorageException;
+ }
+
+ /**
+ * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock
row IDs before updating the data. All acquired locks are
+ * released automatically after {@code execute} call is completed.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ interface Locker {
+ /**
+ * Locks passed row ID until the {@link WriteClosure#execute(Locker)}
is completed.
Review Comment:
Should it be mentioned that it blocks until it can take a lock?
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -136,19 +137,31 @@ public void afterCheckpointEnd(CheckpointProgress
progress) {
@Override
public <V> V runConsistently(WriteClosure<V> closure) throws
StorageException {
- return busy(() -> {
- throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(),
this::createStorageInfo);
+ LocalLocker locker = THREAD_LOCAL_LOCKER.get();
- checkpointTimeoutLock.checkpointReadLock();
+ if (locker != null) {
+ return closure.execute(locker);
+ } else {
+ return busy(() -> {
+
throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(),
this::createStorageInfo);
- try {
- return closure.execute();
- } finally {
- updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+ LocalLocker locker0 = new LocalLocker(lockByRowId);
- checkpointTimeoutLock.checkpointReadUnlock();
- }
- });
+ checkpointTimeoutLock.checkpointReadLock();
+
+ THREAD_LOCAL_LOCKER.set(locker0);
+
+ try {
+ return closure.execute(locker0);
+ } finally {
+ THREAD_LOCAL_LOCKER.set(null);
+
+ locker0.unlockAll();
Review Comment:
If `unlockAll()` fails for some reason, the checkpoint lock will not be
released. How about another `try/finally`?
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -226,8 +273,32 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
* {@code null} if there's no such value.
* @throws StorageException If failed to poll element for vacuum.
*/
+ //TODO IGNITE-19367 Remove this method and replace its usages with proper
batch removes.
+ @Deprecated
default @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
- throw new UnsupportedOperationException("pollForVacuum");
+ while (true) {
+ BinaryRowAndRowId binaryRowAndRowId = runConsistently(locker -> {
+ GcEntry gcEntry = peek(lowWatermark);
+
+ if (gcEntry == null) {
+ return null;
+ }
+
+ locker.lock(gcEntry.getRowId());
Review Comment:
Schematic code in the Jira ticket suggests to use `tryLock()` here, is
`lock()` ok?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -153,69 +154,81 @@ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch,
RowId rowId, HybridTimes
/**
* Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
*
- * @param batch Write batch.
* @param lowWatermark Low watermark.
- * @return Garbage collected element.
+ * @return Garbage collected element descriptor.
* @throws RocksDBException If failed to collect the garbage.
*/
- @Nullable BinaryRowAndRowId pollForVacuum(WriteBatchWithIndex batch,
HybridTimestamp lowWatermark) throws RocksDBException {
- ColumnFamilyHandle partCf = helper.partCf;
-
+ @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
// We retrieve the first element of the GC queue and seek for it in
the data CF.
// However, the element that we need to garbage collect is the next
(older one) element.
// First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
// If the next element exists, that should be the element that we want
to garbage collect.
- try (RocksIterator gcIt = db.newIterator(gcQueueCf,
helper.upperBoundReadOpts)) {
+ try (
+ RocksIterator foo = db.newIterator(gcQueueCf,
helper.upperBoundReadOpts);
+ RocksIterator gcIt = helper.wrapIterator(foo, gcQueueCf)
+ ) {
gcIt.seek(helper.partitionStartPrefix());
if (invalid(gcIt)) {
// GC queue is empty.
return null;
}
- ByteBuffer gcKeyBuffer = readGcKey(gcIt);
+ ByteBuffer gcKeyBuffer = readGcKey(foo);
GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
- while (true) {
- if (gcRowVersion.getRowTimestamp().compareTo(lowWatermark) >
0) {
- // No elements to garbage collect.
- return null;
- }
+ if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) {
+ // No elements to garbage collect.
+ return null;
+ }
- // If no one has processed the head of the gc queue in
parallel, then we must release the lock after write batch in
- // WriteClosure#execute of MvPartitionStorage#runConsistently
so that the indexes can be deleted consistently.
- helper.lockByRowId.acquireLock(gcRowVersion.getRowId());
+ return gcRowVersion;
+ }
+ }
- // We must refresh the iterator to try to read the head of the
gc queue again and if someone deleted it in parallel,
- // then read the new head of the queue.
- refreshGcIterator(gcIt, gcKeyBuffer);
- if (invalid(gcIt)) {
- // GC queue is empty.
- return null;
- }
+ /**
+ * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * @param batch Write batch.
+ * @param lowWatermark Low watermark.
+ * @return Garbage collected element.
+ * @throws RocksDBException If failed to collect the garbage.
+ */
+ @Nullable BinaryRow vacuum(WriteBatchWithIndex batch, GcEntry entry)
throws RocksDBException {
+ assert entry instanceof GcRowVersion;
- gcKeyBuffer = readGcKey(gcIt);
+ ColumnFamilyHandle partCf = helper.partCf;
- GcRowVersion oldGcRowVersion = gcRowVersion;
+ // We retrieve the first element of the GC queue and seek for it in
the data CF.
+ // However, the element that we need to garbage collect is the next
(older one) element.
+ // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
+ // If the next element exists, that should be the element that we want
to garbage collect.
+ try (RocksIterator gcIt = db.newIterator(gcQueueCf,
helper.upperBoundReadOpts)) {
+ gcIt.seek(helper.partitionStartPrefix());
- gcRowVersion = toGcRowVersion(gcKeyBuffer);
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
- // Someone has processed the element in parallel, so we need
to take a new head of the queue.
- if (!gcRowVersion.equals(oldGcRowVersion)) {
- helper.lockByRowId.releaseLock(oldGcRowVersion.getRowId());
+ ByteBuffer gcKeyBuffer = readGcKey(gcIt);
- continue;
- }
+ GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
- break;
+ // Someone has processed the element in parallel, so we need to
take a new head of the queue.
+ if (!gcRowVersion.equals(entry)) {
+ return null;
}
// Delete element from the GC queue.
batch.delete(gcQueueCf, gcKeyBuffer);
- try (RocksIterator it = db.newIterator(partCf,
helper.upperBoundReadOpts)) {
+ try (
+ RocksIterator foo = db.newIterator(partCf,
helper.upperBoundReadOpts);
Review Comment:
`foo` again
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -324,7 +324,10 @@ private void handleTxCleanupCommand(TxCleanupCommand cmd,
long commandIndex, lon
Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId,
Collections.emptySet());
if (cmd.commit()) {
- storage.runConsistently(() -> {
+ storage.runConsistently(locker -> {
+ //TODO Split & sort.
Review Comment:
1. We should not have a TODO without a mentioned Jira ticket
2. Are you sure we can merge this PR without sorting the rowIds? It seems
pretty dangerous, we don't want deadlocks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]