dcapwell commented on code in PR #47: URL: https://github.com/apache/cassandra-accord/pull/47#discussion_r1203029087
########## accord-core/src/main/java/accord/impl/AbstractConfigurationService.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ConfigurationService; +import accord.local.Node; +import accord.topology.Topology; +import accord.utils.Invariants; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState, + EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>> + implements ConfigurationService +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class); + + protected final Node.Id localId; + + protected final EpochHistory epochs = createEpochHistory(); + + protected final List<Listener> listeners = new ArrayList<>(); + + public abstract static class AbstractEpochState + { + protected final long epoch; + protected final AsyncResult.Settable<Topology> received = AsyncResults.settable(); + protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable(); + protected AsyncResult<Void> reads = null; + + protected Topology topology = null; + + public AbstractEpochState(long epoch) + { + this.epoch = epoch; + } + + public long epoch() + { + return epoch; + } + + @Override + public String toString() + { + return "EpochState{" + epoch + '}'; + } + } + + @VisibleForTesting + public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState> + { + // TODO (low priority): move pendingEpochs / FetchTopology into here? + private List<EpochState> epochs = new ArrayList<>(); + + protected long lastReceived = 0; + protected long lastAcknowledged = 0; + + protected abstract EpochState createEpochState(long epoch); + + long minEpoch() + { + return epochs.isEmpty() ? 0L : epochs.get(0).epoch; + } + + long maxEpoch() + { + int size = epochs.size(); + return size == 0 ? 0L : epochs.get(size - 1).epoch; + } + + @VisibleForTesting + EpochState atIndex(int idx) + { + return epochs.get(idx); + } + + @VisibleForTesting + int size() + { + return epochs.size(); + } + + EpochState getOrCreate(long epoch) + { + Invariants.checkArgument(epoch > 0); + if (epochs.isEmpty()) + { + EpochState state = createEpochState(epoch); + epochs.add(state); + return state; + } + + long minEpoch = minEpoch(); + if (epoch < minEpoch) + { + int prepend = Ints.checkedCast(minEpoch - epoch); + List<EpochState> next = new ArrayList<>(epochs.size() + prepend); + for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++) + next.add(createEpochState(addEpoch)); + next.addAll(epochs); + epochs = next; + minEpoch = minEpoch(); + Invariants.checkState(minEpoch == epoch); + } + long maxEpoch = maxEpoch(); + int idx = Ints.checkedCast(epoch - minEpoch); + + // add any missing epochs + for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++) + epochs.add(createEpochState(addEpoch)); + + return epochs.get(idx); + } + + public void receive(Topology topology) + { + long epoch = topology.epoch(); + Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0); + lastReceived = epoch; + EpochState state = getOrCreate(epoch); + if (state != null) + { + state.topology = topology; + state.received.setSuccess(topology); + } + } + + AsyncResult<Topology> receiveFuture(long epoch) + { + return getOrCreate(epoch).received; + } + + Topology topologyFor(long epoch) + { + return getOrCreate(epoch).topology; + } + + public void acknowledge(EpochReady ready) + { + long epoch = ready.epoch; + Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0); + lastAcknowledged = epoch; + EpochState state = getOrCreate(epoch); + state.reads = ready.reads; + state.acknowledged.setSuccess(null); + } + + AsyncResult<Void> acknowledgeFuture(long epoch) + { + return getOrCreate(epoch).acknowledged; + } + + void truncateUntil(long epoch) + { + Invariants.checkArgument(epoch <= maxEpoch()); + long minEpoch = minEpoch(); + int toTrim = Ints.checkedCast(epoch - minEpoch); + if (toTrim <=0) + return; + + epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size())); + } + } + + public AbstractConfigurationService(Node.Id localId) + { + this.localId = localId; + } + + protected abstract EpochHistory createEpochHistory(); + + protected EpochState getOrCreateEpochState(long epoch) + { + return epochs.getOrCreate(epoch); + } + + @Override + public synchronized void registerListener(Listener listener) + { + listeners.add(listener); + } + + @Override + public synchronized Topology currentTopology() + { + return epochs.topologyFor(epochs.lastReceived); + } + + @Override + public synchronized Topology getTopologyForEpoch(long epoch) + { + return epochs.topologyFor(epoch); + } + + protected abstract void fetchTopologyInternal(long epoch); + + @Override + public synchronized void fetchTopologyForEpoch(long epoch) + { + if (epoch <= epochs.lastReceived) + return; + + fetchTopologyInternal(epoch); + } + + protected abstract void localSyncComplete(Topology topology); + + @Override + public synchronized void acknowledgeEpoch(EpochReady ready) + { + ready.metadata.addCallback(() -> epochs.acknowledge(ready)); + ready.coordination.addCallback(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology)); + } + + protected void topologyUpdatePreListenerNotify(Topology topology) {} + protected void topologyUpdatePostListenerNotify(Topology topology) {} + + public synchronized AsyncResult<Void> reportTopology(Topology topology, boolean startSync) + { + long lastReceived = epochs.lastReceived; + if (topology.epoch() <= lastReceived) + return AsyncResults.success(null); + + if (lastReceived > 0 && topology.epoch() > lastReceived + 1) + { + fetchTopologyForEpoch(lastReceived + 1); + epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology, startSync)); Review Comment: if `latestRecieved = 3` and `epoch() = 5`, then this would "fetch" `epoch = 4` but not `epoch = 5`? ########## accord-core/src/main/java/accord/impl/AbstractConfigurationService.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ConfigurationService; +import accord.local.Node; +import accord.topology.Topology; +import accord.utils.Invariants; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState, + EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>> + implements ConfigurationService +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class); + + protected final Node.Id localId; + + protected final EpochHistory epochs = createEpochHistory(); + + protected final List<Listener> listeners = new ArrayList<>(); + + public abstract static class AbstractEpochState + { + protected final long epoch; + protected final AsyncResult.Settable<Topology> received = AsyncResults.settable(); + protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable(); + protected AsyncResult<Void> reads = null; + + protected Topology topology = null; + + public AbstractEpochState(long epoch) + { + this.epoch = epoch; + } + + public long epoch() + { + return epoch; + } + + @Override + public String toString() + { + return "EpochState{" + epoch + '}'; + } + } + + @VisibleForTesting + public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState> + { + // TODO (low priority): move pendingEpochs / FetchTopology into here? + private List<EpochState> epochs = new ArrayList<>(); + + protected long lastReceived = 0; + protected long lastAcknowledged = 0; + + protected abstract EpochState createEpochState(long epoch); + + long minEpoch() + { + return epochs.isEmpty() ? 0L : epochs.get(0).epoch; + } + + long maxEpoch() + { + int size = epochs.size(); + return size == 0 ? 0L : epochs.get(size - 1).epoch; + } + + @VisibleForTesting + EpochState atIndex(int idx) + { + return epochs.get(idx); + } + + @VisibleForTesting + int size() + { + return epochs.size(); + } + + EpochState getOrCreate(long epoch) + { + Invariants.checkArgument(epoch > 0); + if (epochs.isEmpty()) + { + EpochState state = createEpochState(epoch); + epochs.add(state); + return state; + } + + long minEpoch = minEpoch(); + if (epoch < minEpoch) + { + int prepend = Ints.checkedCast(minEpoch - epoch); + List<EpochState> next = new ArrayList<>(epochs.size() + prepend); + for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++) + next.add(createEpochState(addEpoch)); + next.addAll(epochs); + epochs = next; + minEpoch = minEpoch(); + Invariants.checkState(minEpoch == epoch); + } + long maxEpoch = maxEpoch(); + int idx = Ints.checkedCast(epoch - minEpoch); + + // add any missing epochs + for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++) + epochs.add(createEpochState(addEpoch)); + + return epochs.get(idx); + } + + public void receive(Topology topology) + { + long epoch = topology.epoch(); + Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0); + lastReceived = epoch; + EpochState state = getOrCreate(epoch); + if (state != null) + { + state.topology = topology; + state.received.setSuccess(topology); + } + } + + AsyncResult<Topology> receiveFuture(long epoch) + { + return getOrCreate(epoch).received; + } + + Topology topologyFor(long epoch) + { + return getOrCreate(epoch).topology; + } + + public void acknowledge(EpochReady ready) + { + long epoch = ready.epoch; + Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0); + lastAcknowledged = epoch; + EpochState state = getOrCreate(epoch); + state.reads = ready.reads; + state.acknowledged.setSuccess(null); + } + + AsyncResult<Void> acknowledgeFuture(long epoch) + { + return getOrCreate(epoch).acknowledged; + } + + void truncateUntil(long epoch) + { + Invariants.checkArgument(epoch <= maxEpoch()); + long minEpoch = minEpoch(); + int toTrim = Ints.checkedCast(epoch - minEpoch); + if (toTrim <=0) + return; + + epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size())); + } + } + + public AbstractConfigurationService(Node.Id localId) + { + this.localId = localId; + } + + protected abstract EpochHistory createEpochHistory(); + + protected EpochState getOrCreateEpochState(long epoch) + { + return epochs.getOrCreate(epoch); + } + + @Override + public synchronized void registerListener(Listener listener) + { + listeners.add(listener); + } + + @Override + public synchronized Topology currentTopology() + { + return epochs.topologyFor(epochs.lastReceived); + } + + @Override + public synchronized Topology getTopologyForEpoch(long epoch) + { + return epochs.topologyFor(epoch); + } + + protected abstract void fetchTopologyInternal(long epoch); + + @Override + public synchronized void fetchTopologyForEpoch(long epoch) + { + if (epoch <= epochs.lastReceived) + return; + + fetchTopologyInternal(epoch); + } + + protected abstract void localSyncComplete(Topology topology); + + @Override + public synchronized void acknowledgeEpoch(EpochReady ready) + { + ready.metadata.addCallback(() -> epochs.acknowledge(ready)); + ready.coordination.addCallback(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology)); + } + + protected void topologyUpdatePreListenerNotify(Topology topology) {} + protected void topologyUpdatePostListenerNotify(Topology topology) {} + + public synchronized AsyncResult<Void> reportTopology(Topology topology, boolean startSync) + { + long lastReceived = epochs.lastReceived; + if (topology.epoch() <= lastReceived) + return AsyncResults.success(null); + + if (lastReceived > 0 && topology.epoch() > lastReceived + 1) + { + fetchTopologyForEpoch(lastReceived + 1); + epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology, startSync)); + return AsyncResults.success(null); Review Comment: this API is said void, but every instance is not async and always success... why is this async? Shouldn't it be based off the epoch being acked? ########## accord-core/src/main/java/accord/impl/AbstractConfigurationService.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ConfigurationService; +import accord.local.Node; +import accord.topology.Topology; +import accord.utils.Invariants; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState, + EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>> + implements ConfigurationService +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class); + + protected final Node.Id localId; + + protected final EpochHistory epochs = createEpochHistory(); + + protected final List<Listener> listeners = new ArrayList<>(); + + public abstract static class AbstractEpochState + { + protected final long epoch; + protected final AsyncResult.Settable<Topology> received = AsyncResults.settable(); + protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable(); + protected AsyncResult<Void> reads = null; + + protected Topology topology = null; + + public AbstractEpochState(long epoch) + { + this.epoch = epoch; + } + + public long epoch() + { + return epoch; + } + + @Override + public String toString() + { + return "EpochState{" + epoch + '}'; + } + } + + @VisibleForTesting + public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState> + { + // TODO (low priority): move pendingEpochs / FetchTopology into here? + private List<EpochState> epochs = new ArrayList<>(); + + protected long lastReceived = 0; + protected long lastAcknowledged = 0; + + protected abstract EpochState createEpochState(long epoch); + + long minEpoch() + { + return epochs.isEmpty() ? 0L : epochs.get(0).epoch; + } + + long maxEpoch() + { + int size = epochs.size(); + return size == 0 ? 0L : epochs.get(size - 1).epoch; + } + + @VisibleForTesting + EpochState atIndex(int idx) + { + return epochs.get(idx); + } + + @VisibleForTesting + int size() + { + return epochs.size(); + } + + EpochState getOrCreate(long epoch) + { + Invariants.checkArgument(epoch > 0); + if (epochs.isEmpty()) + { + EpochState state = createEpochState(epoch); + epochs.add(state); + return state; + } + + long minEpoch = minEpoch(); + if (epoch < minEpoch) + { + int prepend = Ints.checkedCast(minEpoch - epoch); + List<EpochState> next = new ArrayList<>(epochs.size() + prepend); + for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++) + next.add(createEpochState(addEpoch)); + next.addAll(epochs); + epochs = next; + minEpoch = minEpoch(); + Invariants.checkState(minEpoch == epoch); + } + long maxEpoch = maxEpoch(); + int idx = Ints.checkedCast(epoch - minEpoch); + + // add any missing epochs + for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++) + epochs.add(createEpochState(addEpoch)); + + return epochs.get(idx); + } + + public void receive(Topology topology) + { + long epoch = topology.epoch(); + Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0); + lastReceived = epoch; + EpochState state = getOrCreate(epoch); + if (state != null) + { + state.topology = topology; + state.received.setSuccess(topology); + } + } + + AsyncResult<Topology> receiveFuture(long epoch) + { + return getOrCreate(epoch).received; + } + + Topology topologyFor(long epoch) + { + return getOrCreate(epoch).topology; + } + + public void acknowledge(EpochReady ready) + { + long epoch = ready.epoch; + Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0); + lastAcknowledged = epoch; + EpochState state = getOrCreate(epoch); + state.reads = ready.reads; + state.acknowledged.setSuccess(null); + } + + AsyncResult<Void> acknowledgeFuture(long epoch) + { + return getOrCreate(epoch).acknowledged; + } + + void truncateUntil(long epoch) + { + Invariants.checkArgument(epoch <= maxEpoch()); + long minEpoch = minEpoch(); + int toTrim = Ints.checkedCast(epoch - minEpoch); + if (toTrim <=0) + return; + + epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size())); + } + } + + public AbstractConfigurationService(Node.Id localId) + { + this.localId = localId; + } + + protected abstract EpochHistory createEpochHistory(); + + protected EpochState getOrCreateEpochState(long epoch) + { + return epochs.getOrCreate(epoch); + } + + @Override + public synchronized void registerListener(Listener listener) + { + listeners.add(listener); + } + + @Override + public synchronized Topology currentTopology() + { + return epochs.topologyFor(epochs.lastReceived); + } + + @Override + public synchronized Topology getTopologyForEpoch(long epoch) + { + return epochs.topologyFor(epoch); + } + + protected abstract void fetchTopologyInternal(long epoch); + + @Override + public synchronized void fetchTopologyForEpoch(long epoch) + { + if (epoch <= epochs.lastReceived) + return; + + fetchTopologyInternal(epoch); + } + + protected abstract void localSyncComplete(Topology topology); + + @Override + public synchronized void acknowledgeEpoch(EpochReady ready) + { + ready.metadata.addCallback(() -> epochs.acknowledge(ready)); + ready.coordination.addCallback(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology)); + } + + protected void topologyUpdatePreListenerNotify(Topology topology) {} + protected void topologyUpdatePostListenerNotify(Topology topology) {} + + public synchronized AsyncResult<Void> reportTopology(Topology topology, boolean startSync) + { + long lastReceived = epochs.lastReceived; + if (topology.epoch() <= lastReceived) + return AsyncResults.success(null); + + if (lastReceived > 0 && topology.epoch() > lastReceived + 1) + { + fetchTopologyForEpoch(lastReceived + 1); + epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology, startSync)); + return AsyncResults.success(null); + } + + long lastAcked = epochs.lastAcknowledged; + if (lastAcked > 0 && topology.epoch() > lastAcked + 1) + { + epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology, startSync)); + return AsyncResults.success(null); + } + logger.trace("Epoch {} received by {}", topology.epoch(), localId); + + epochs.receive(topology); + topologyUpdatePreListenerNotify(topology); + for (Listener listener : listeners) + listener.onTopologyUpdate(topology, startSync); + topologyUpdatePostListenerNotify(topology); + return AsyncResults.success(null); + } + + public synchronized AsyncResult<Void> reportTopology(Topology topology) + { + return reportTopology(topology, true); + } + + protected void remoteSyncCompletePreListenerNotify(Node.Id node, long epoch) {} + + public synchronized void remoteSyncComplete(Node.Id node, long epoch) + { + remoteSyncCompletePreListenerNotify(node, epoch); + for (Listener listener : listeners) + listener.onRemoteSyncComplete(node, epoch); + } + + protected void truncateTopologiesPreListenerNotify(long epoch) {} + protected void truncateTopologiesPostListenerNotify(long epoch) {} + + public synchronized void truncateTopologiesUntil(long epoch) + { + truncateTopologiesPreListenerNotify(epoch); + for (Listener listener : listeners) + listener.truncateTopologyUntil(epoch); + truncateTopologiesPostListenerNotify(epoch); + epochs.truncateUntil(epoch); + } + + public synchronized AsyncResult<Void> epochReady(long epoch) + { + EpochState state = epochs.getOrCreate(epoch); + if (state.reads != null) + return state.reads; + + return state.acknowledged.flatMap(r -> state.reads).beginAsResult(); Review Comment: maybe return `AsyncChain` to avoid `beginAsResult`? ########## accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator Review Comment: this class looks like a copy/paste of `accord.impl.list.ListStore.SyncCoordinator`? ########## accord-core/src/main/java/accord/impl/AbstractConfigurationService.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ConfigurationService; +import accord.local.Node; +import accord.topology.Topology; +import accord.utils.Invariants; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState, + EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>> + implements ConfigurationService +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class); + + protected final Node.Id localId; + + protected final EpochHistory epochs = createEpochHistory(); + + protected final List<Listener> listeners = new ArrayList<>(); + + public abstract static class AbstractEpochState + { + protected final long epoch; + protected final AsyncResult.Settable<Topology> received = AsyncResults.settable(); + protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable(); + protected AsyncResult<Void> reads = null; + + protected Topology topology = null; + + public AbstractEpochState(long epoch) + { + this.epoch = epoch; + } + + public long epoch() + { + return epoch; + } + + @Override + public String toString() + { + return "EpochState{" + epoch + '}'; + } + } + + @VisibleForTesting + public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState> + { + // TODO (low priority): move pendingEpochs / FetchTopology into here? + private List<EpochState> epochs = new ArrayList<>(); + + protected long lastReceived = 0; + protected long lastAcknowledged = 0; + + protected abstract EpochState createEpochState(long epoch); + + long minEpoch() + { + return epochs.isEmpty() ? 0L : epochs.get(0).epoch; + } + + long maxEpoch() + { + int size = epochs.size(); + return size == 0 ? 0L : epochs.get(size - 1).epoch; + } + + @VisibleForTesting + EpochState atIndex(int idx) + { + return epochs.get(idx); + } + + @VisibleForTesting + int size() + { + return epochs.size(); + } + + EpochState getOrCreate(long epoch) + { + Invariants.checkArgument(epoch > 0); + if (epochs.isEmpty()) + { + EpochState state = createEpochState(epoch); + epochs.add(state); + return state; + } + + long minEpoch = minEpoch(); + if (epoch < minEpoch) + { + int prepend = Ints.checkedCast(minEpoch - epoch); + List<EpochState> next = new ArrayList<>(epochs.size() + prepend); + for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++) + next.add(createEpochState(addEpoch)); + next.addAll(epochs); + epochs = next; + minEpoch = minEpoch(); + Invariants.checkState(minEpoch == epoch); + } + long maxEpoch = maxEpoch(); + int idx = Ints.checkedCast(epoch - minEpoch); + + // add any missing epochs + for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++) + epochs.add(createEpochState(addEpoch)); + + return epochs.get(idx); + } + + public void receive(Topology topology) + { + long epoch = topology.epoch(); + Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0); + lastReceived = epoch; + EpochState state = getOrCreate(epoch); + if (state != null) + { + state.topology = topology; + state.received.setSuccess(topology); + } + } + + AsyncResult<Topology> receiveFuture(long epoch) + { + return getOrCreate(epoch).received; + } + + Topology topologyFor(long epoch) + { + return getOrCreate(epoch).topology; + } + + public void acknowledge(EpochReady ready) + { + long epoch = ready.epoch; + Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0); + lastAcknowledged = epoch; + EpochState state = getOrCreate(epoch); + state.reads = ready.reads; + state.acknowledged.setSuccess(null); + } + + AsyncResult<Void> acknowledgeFuture(long epoch) + { + return getOrCreate(epoch).acknowledged; + } + + void truncateUntil(long epoch) + { + Invariants.checkArgument(epoch <= maxEpoch()); + long minEpoch = minEpoch(); + int toTrim = Ints.checkedCast(epoch - minEpoch); + if (toTrim <=0) + return; + + epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size())); + } + } + + public AbstractConfigurationService(Node.Id localId) + { + this.localId = localId; + } + + protected abstract EpochHistory createEpochHistory(); + + protected EpochState getOrCreateEpochState(long epoch) + { + return epochs.getOrCreate(epoch); + } + + @Override + public synchronized void registerListener(Listener listener) + { + listeners.add(listener); + } + + @Override + public synchronized Topology currentTopology() + { + return epochs.topologyFor(epochs.lastReceived); + } + + @Override + public synchronized Topology getTopologyForEpoch(long epoch) + { + return epochs.topologyFor(epoch); + } + + protected abstract void fetchTopologyInternal(long epoch); + + @Override + public synchronized void fetchTopologyForEpoch(long epoch) + { + if (epoch <= epochs.lastReceived) + return; + + fetchTopologyInternal(epoch); + } + + protected abstract void localSyncComplete(Topology topology); + + @Override + public synchronized void acknowledgeEpoch(EpochReady ready) + { + ready.metadata.addCallback(() -> epochs.acknowledge(ready)); + ready.coordination.addCallback(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology)); + } + + protected void topologyUpdatePreListenerNotify(Topology topology) {} + protected void topologyUpdatePostListenerNotify(Topology topology) {} + + public synchronized AsyncResult<Void> reportTopology(Topology topology, boolean startSync) Review Comment: why have `boolean startSync`? ########## accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator +{ + static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult + { + final AbstractFetchCoordinator coordinator; + + FetchResult(AbstractFetchCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public void abort(Ranges abort) + { + coordinator.abort(abort); + } + } + + static class Key + { + final Node.Id id; + final Ranges ranges; + + Key(Node.Id id, Ranges ranges) + { + this.id = id; + this.ranges = ranges; + } + + @Override + public int hashCode() + { + return id.hashCode() + ranges.hashCode(); + } Review Comment: formatting ########## accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator +{ + static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult + { + final AbstractFetchCoordinator coordinator; + + FetchResult(AbstractFetchCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public void abort(Ranges abort) + { + coordinator.abort(abort); + } + } + + static class Key + { + final Node.Id id; + final Ranges ranges; + + Key(Node.Id id, Ranges ranges) + { + this.id = id; + this.ranges = ranges; + } + + @Override + public int hashCode() + { + return id.hashCode() + ranges.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) return true; + if (!(obj instanceof Key)) return false; + Key that = (Key) obj; + return id.equals(that.id) && ranges.equals(that.ranges); + } + } + + final DataStore.FetchRanges fetchRanges; + final CommandStore commandStore; + final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>(); + final FetchResult result = new FetchResult(this); + final List<AsyncResult<Void>> persisting = new ArrayList<>(); + + protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore) + { + super(node, ranges, syncPoint, fetchRanges); + this.fetchRanges = fetchRanges; + this.commandStore = commandStore; + } + + protected abstract PartialTxn rangeReadTxn(Ranges ranges); + + protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges); + + @Override + public void contact(Node.Id to, Ranges ranges) + { + Key key = new Key(to, ranges); + inflight.put(key, starting(to, ranges)); + Ranges ownedRanges = ownedRangesForNode(to); + Invariants.checkArgument(ownedRanges.containsAll(ranges)); + PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); + node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>() + { + @Override + public void onSuccess(Node.Id from, ReadData.ReadReply reply) + { + if (!reply.isOk()) + { + fail(to, new RuntimeException(reply.toString())); + inflight.remove(key).cancel(); + switch ((ReadData.ReadNack) reply) + { + default: throw new AssertionError("Unhandled enum"); + case Invalid: + case Redundant: + case NotCommitted: + throw new AssertionError(); + case Error: + // TODO (required): ensure errors are propagated to coordinators and can be logged + } + return; + } + + FetchResponse ok = (FetchResponse) reply; + Ranges received; + if (ok.unavailable != null) + { + unavailable(to, ok.unavailable); + if (ok.data == null) + { + inflight.remove(key).cancel(); + return; + } + received = ranges.difference(ok.unavailable); + } + else + { + received = ranges; + } + + // TODO (now): make sure it works if invoked in either order + inflight.remove(key).started(ok.maxApplied); + onReadOk(to, commandStore, ok.data, received); + // received must be invoked after submitting the persistence future, as it triggers onDone + // which creates a ReducingFuture over {@code persisting} + } + + @Override + public void onFailure(Node.Id from, Throwable failure) + { + inflight.remove(key).cancel(); + fail(from, failure); + } + + @Override + public void onCallbackFailure(Node.Id from, Throwable failure) + { + // TODO (soon) + failure.printStackTrace(); + } + }); + } + + @Override + protected synchronized void success(Node.Id to, Ranges ranges) + { + super.success(to, ranges); + } + + @Override + protected synchronized void fail(Node.Id to, Ranges ranges, Throwable failure) + { + super.fail(to, ranges, failure); + } Review Comment: why override to add `synchronized`? ########## accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator +{ + static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult + { + final AbstractFetchCoordinator coordinator; + + FetchResult(AbstractFetchCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public void abort(Ranges abort) + { + coordinator.abort(abort); + } + } + + static class Key + { + final Node.Id id; + final Ranges ranges; + + Key(Node.Id id, Ranges ranges) + { + this.id = id; + this.ranges = ranges; + } + + @Override + public int hashCode() + { + return id.hashCode() + ranges.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) return true; + if (!(obj instanceof Key)) return false; + Key that = (Key) obj; + return id.equals(that.id) && ranges.equals(that.ranges); + } + } + + final DataStore.FetchRanges fetchRanges; + final CommandStore commandStore; + final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>(); + final FetchResult result = new FetchResult(this); + final List<AsyncResult<Void>> persisting = new ArrayList<>(); + + protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore) + { + super(node, ranges, syncPoint, fetchRanges); + this.fetchRanges = fetchRanges; + this.commandStore = commandStore; + } + + protected abstract PartialTxn rangeReadTxn(Ranges ranges); + + protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges); + + @Override + public void contact(Node.Id to, Ranges ranges) + { + Key key = new Key(to, ranges); + inflight.put(key, starting(to, ranges)); + Ranges ownedRanges = ownedRangesForNode(to); + Invariants.checkArgument(ownedRanges.containsAll(ranges)); + PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); + node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>() + { + @Override + public void onSuccess(Node.Id from, ReadData.ReadReply reply) + { + if (!reply.isOk()) + { + fail(to, new RuntimeException(reply.toString())); + inflight.remove(key).cancel(); + switch ((ReadData.ReadNack) reply) + { + default: throw new AssertionError("Unhandled enum"); + case Invalid: + case Redundant: Review Comment: trying to wrap my head around this... doesn't this just mean concurrent txn (original coordinator + recovery coordinator)? So this is just saying that "we are reusing message reply, and these states are not valid for us" right? ########## accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator +{ + static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult + { + final AbstractFetchCoordinator coordinator; + + FetchResult(AbstractFetchCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public void abort(Ranges abort) + { + coordinator.abort(abort); + } + } + + static class Key + { + final Node.Id id; + final Ranges ranges; + + Key(Node.Id id, Ranges ranges) + { + this.id = id; + this.ranges = ranges; + } + + @Override + public int hashCode() + { + return id.hashCode() + ranges.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) return true; + if (!(obj instanceof Key)) return false; + Key that = (Key) obj; + return id.equals(that.id) && ranges.equals(that.ranges); + } + } + + final DataStore.FetchRanges fetchRanges; + final CommandStore commandStore; + final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>(); + final FetchResult result = new FetchResult(this); + final List<AsyncResult<Void>> persisting = new ArrayList<>(); + + protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore) + { + super(node, ranges, syncPoint, fetchRanges); + this.fetchRanges = fetchRanges; + this.commandStore = commandStore; + } + + protected abstract PartialTxn rangeReadTxn(Ranges ranges); + + protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges); + + @Override + public void contact(Node.Id to, Ranges ranges) + { + Key key = new Key(to, ranges); + inflight.put(key, starting(to, ranges)); + Ranges ownedRanges = ownedRangesForNode(to); + Invariants.checkArgument(ownedRanges.containsAll(ranges)); + PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); + node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>() + { + @Override + public void onSuccess(Node.Id from, ReadData.ReadReply reply) + { + if (!reply.isOk()) + { + fail(to, new RuntimeException(reply.toString())); + inflight.remove(key).cancel(); + switch ((ReadData.ReadNack) reply) + { + default: throw new AssertionError("Unhandled enum"); + case Invalid: + case Redundant: + case NotCommitted: + throw new AssertionError(); Review Comment: nit, can you show the enum value in the error msg? ########## accord-core/src/main/java/accord/local/CommandStores.java: ########## @@ -358,7 +358,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top bootstrapUpdates.add(shard.store.interruptBootstraps(epoch, newRanges.currentRanges())); } // TODO (desired): only sync affected shards - if (epoch > 1) + if (epoch > 1 && startSync) Review Comment: rewritten in https://issues.apache.org/jira/browse/CASSANDRA-18519, the logic is now ``` Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> shouldBootstrap(node, prev.local, newLocalTopology, range)); if (partitioned.containsKey(true)) bootstrapUpdates.add(shardHolder.store.bootstrapper(node, partitioned.get(true), newLocalTopology.epoch())); if (partitioned.containsKey(false)) bootstrapUpdates.add(() -> shardHolder.store.initialise(epoch, partitioned.get(false))); ``` ########## accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator +{ + static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult + { + final AbstractFetchCoordinator coordinator; + + FetchResult(AbstractFetchCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public void abort(Ranges abort) + { + coordinator.abort(abort); + } + } + + static class Key + { + final Node.Id id; + final Ranges ranges; + + Key(Node.Id id, Ranges ranges) + { + this.id = id; + this.ranges = ranges; + } + + @Override + public int hashCode() + { + return id.hashCode() + ranges.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) return true; + if (!(obj instanceof Key)) return false; + Key that = (Key) obj; + return id.equals(that.id) && ranges.equals(that.ranges); + } + } + + final DataStore.FetchRanges fetchRanges; + final CommandStore commandStore; + final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>(); + final FetchResult result = new FetchResult(this); + final List<AsyncResult<Void>> persisting = new ArrayList<>(); + + protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore) + { + super(node, ranges, syncPoint, fetchRanges); + this.fetchRanges = fetchRanges; + this.commandStore = commandStore; + } + + protected abstract PartialTxn rangeReadTxn(Ranges ranges); + + protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges); + + @Override + public void contact(Node.Id to, Ranges ranges) + { + Key key = new Key(to, ranges); + inflight.put(key, starting(to, ranges)); + Ranges ownedRanges = ownedRangesForNode(to); + Invariants.checkArgument(ownedRanges.containsAll(ranges)); + PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); + node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>() + { + @Override + public void onSuccess(Node.Id from, ReadData.ReadReply reply) + { + if (!reply.isOk()) + { + fail(to, new RuntimeException(reply.toString())); + inflight.remove(key).cancel(); + switch ((ReadData.ReadNack) reply) + { + default: throw new AssertionError("Unhandled enum"); + case Invalid: + case Redundant: + case NotCommitted: + throw new AssertionError(); + case Error: + // TODO (required): ensure errors are propagated to coordinators and can be logged + } + return; + } + + FetchResponse ok = (FetchResponse) reply; + Ranges received; + if (ok.unavailable != null) + { + unavailable(to, ok.unavailable); + if (ok.data == null) + { + inflight.remove(key).cancel(); + return; + } + received = ranges.difference(ok.unavailable); + } + else + { + received = ranges; + } + + // TODO (now): make sure it works if invoked in either order + inflight.remove(key).started(ok.maxApplied); + onReadOk(to, commandStore, ok.data, received); + // received must be invoked after submitting the persistence future, as it triggers onDone + // which creates a ReducingFuture over {@code persisting} + } + + @Override + public void onFailure(Node.Id from, Throwable failure) + { + inflight.remove(key).cancel(); + fail(from, failure); + } + + @Override + public void onCallbackFailure(Node.Id from, Throwable failure) + { + // TODO (soon) + failure.printStackTrace(); + } + }); + } + + @Override + protected synchronized void success(Node.Id to, Ranges ranges) + { + super.success(to, ranges); + } + + @Override + protected synchronized void fail(Node.Id to, Ranges ranges, Throwable failure) + { + super.fail(to, ranges, failure); + } + + public FetchResult result() + { + return result; + } + + @Override + protected void onDone(Ranges success, Throwable failure) + { + if (success.isEmpty()) result.setFailure(failure); + else if (persisting.isEmpty()) result.setSuccess(null); + else AsyncChains.reduce(persisting, (a, b)-> null) + .begin((s, f) -> { + if (f == null) result.setSuccess(ranges); + else result.setFailure(f); + }); + } + + @Override + public void start() + { + super.start(); + } Review Comment: formatting ########## accord-core/src/main/java/accord/topology/TopologyManager.java: ########## @@ -174,6 +176,16 @@ public long nextEpoch() return current().epoch + 1; } + public long minEpoch() + { + return currentEpoch - epochs.length + 1; Review Comment: if empty isn't this `0 - 0 + 1 = 1`, and should be `0`? ########## accord-core/src/main/java/accord/topology/TopologyManager.java: ########## @@ -270,6 +284,22 @@ public void onEpochSyncComplete(Id node, long epoch) epochs.syncComplete(node, epoch); } + public synchronized void truncateTopologyUntil(long epoch) + { + Epochs current = epochs; + checkArgument(current.epoch() >= epoch); Review Comment: ```suggestion checkArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d is > current epoch %d", epoch , current.epoch()); ``` ########## accord-core/src/main/java/accord/topology/TopologyManager.java: ########## @@ -223,14 +237,14 @@ public TopologyManager(TopologySorter.Supplier sorter, Id node) { this.sorter = sorter; this.node = node; - this.epochs = new Epochs(new EpochState[0]); + this.epochs = Epochs.EMPTY; } public synchronized void onTopologyUpdate(Topology topology) { Epochs current = epochs; - checkArgument(topology.epoch == current.nextEpoch()); + checkArgument(topology.epoch == current.nextEpoch() || epochs == Epochs.EMPTY); Review Comment: useful error msg would be nice =D ########## accord-core/src/main/java/accord/topology/TopologyManager.java: ########## @@ -270,6 +284,22 @@ public void onEpochSyncComplete(Id node, long epoch) epochs.syncComplete(node, epoch); } + public synchronized void truncateTopologyUntil(long epoch) + { + Epochs current = epochs; + checkArgument(current.epoch() >= epoch); + + if (current.minEpoch() >= epoch) + return; + + int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); + Invariants.checkState(current.epochs[newLen - 1].syncComplete()); Review Comment: ```suggestion Invariants.checkState(current.epochs[newLen - 1].syncComplete(), "Epoch %d's sync is not complete", current.epochs[newLen - 1].epoch); ``` ########## accord-core/src/main/java/accord/local/CommandStores.java: ########## @@ -358,7 +358,7 @@ private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Top bootstrapUpdates.add(shard.store.interruptBootstraps(epoch, newRanges.currentRanges())); } // TODO (desired): only sync affected shards - if (epoch > 1) + if (epoch > 1 && startSync) Review Comment: I feel the `startSync` is to work around the bug where we don't want to bootstrap when a keyspace is added? If so I added a new `accord.local.CommandStores#shouldBootstrap` for that... One issue to deal with is that we could have a new keyspace at the same time we see a range movement; so we need to bootstrap the range movement but not the new keyspace ########## accord-core/src/main/java/accord/local/Node.java: ########## @@ -150,9 +149,10 @@ public Node(Id id, MessageSink messageSink, ConfigurationService configService, configService.registerListener(this); } + // FIXME: remove, only used byy Maelstrom Review Comment: can we delete Maelstrom!??!?!1 ########## accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator +{ + static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult + { + final AbstractFetchCoordinator coordinator; + + FetchResult(AbstractFetchCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public void abort(Ranges abort) + { + coordinator.abort(abort); + } + } + + static class Key + { + final Node.Id id; + final Ranges ranges; + + Key(Node.Id id, Ranges ranges) + { + this.id = id; + this.ranges = ranges; + } + + @Override + public int hashCode() + { + return id.hashCode() + ranges.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) return true; + if (!(obj instanceof Key)) return false; + Key that = (Key) obj; + return id.equals(that.id) && ranges.equals(that.ranges); + } + } + + final DataStore.FetchRanges fetchRanges; + final CommandStore commandStore; + final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>(); + final FetchResult result = new FetchResult(this); + final List<AsyncResult<Void>> persisting = new ArrayList<>(); + + protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore) + { + super(node, ranges, syncPoint, fetchRanges); + this.fetchRanges = fetchRanges; + this.commandStore = commandStore; + } + + protected abstract PartialTxn rangeReadTxn(Ranges ranges); + + protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges); + + @Override + public void contact(Node.Id to, Ranges ranges) + { + Key key = new Key(to, ranges); + inflight.put(key, starting(to, ranges)); + Ranges ownedRanges = ownedRangesForNode(to); + Invariants.checkArgument(ownedRanges.containsAll(ranges)); + PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); + node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>() + { + @Override + public void onSuccess(Node.Id from, ReadData.ReadReply reply) + { + if (!reply.isOk()) + { + fail(to, new RuntimeException(reply.toString())); + inflight.remove(key).cancel(); + switch ((ReadData.ReadNack) reply) + { + default: throw new AssertionError("Unhandled enum"); + case Invalid: + case Redundant: + case NotCommitted: + throw new AssertionError(); + case Error: + // TODO (required): ensure errors are propagated to coordinators and can be logged + } + return; + } + + FetchResponse ok = (FetchResponse) reply; + Ranges received; + if (ok.unavailable != null) + { + unavailable(to, ok.unavailable); + if (ok.data == null) + { + inflight.remove(key).cancel(); + return; + } + received = ranges.difference(ok.unavailable); + } + else + { + received = ranges; + } + + // TODO (now): make sure it works if invoked in either order + inflight.remove(key).started(ok.maxApplied); + onReadOk(to, commandStore, ok.data, received); + // received must be invoked after submitting the persistence future, as it triggers onDone + // which creates a ReducingFuture over {@code persisting} + } + + @Override + public void onFailure(Node.Id from, Throwable failure) + { + inflight.remove(key).cancel(); + fail(from, failure); + } + + @Override + public void onCallbackFailure(Node.Id from, Throwable failure) + { + // TODO (soon) + failure.printStackTrace(); + } + }); + } + + @Override + protected synchronized void success(Node.Id to, Ranges ranges) + { + super.success(to, ranges); + } + + @Override + protected synchronized void fail(Node.Id to, Ranges ranges, Throwable failure) + { + super.fail(to, ranges, failure); + } + + public FetchResult result() + { + return result; + } + + @Override + protected void onDone(Ranges success, Throwable failure) + { + if (success.isEmpty()) result.setFailure(failure); + else if (persisting.isEmpty()) result.setSuccess(null); + else AsyncChains.reduce(persisting, (a, b)-> null) Review Comment: ```suggestion else AsyncChains.reduce(persisting, (a, b) -> null) ``` ########## accord-core/src/main/java/accord/topology/TopologyManager.java: ########## @@ -270,6 +284,22 @@ public void onEpochSyncComplete(Id node, long epoch) epochs.syncComplete(node, epoch); } + public synchronized void truncateTopologyUntil(long epoch) + { + Epochs current = epochs; + checkArgument(current.epoch() >= epoch); + + if (current.minEpoch() >= epoch) + return; + + int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); + Invariants.checkState(current.epochs[newLen - 1].syncComplete()); + + EpochState[] nextEpochs = new EpochState[newLen]; + System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); + epochs = new Epochs(nextEpochs, current.pendingSyncComplete, current.futureEpochFutures); Review Comment: nit: `java.util.Arrays#copyOfRange(U[], int, int, java.lang.Class<? extends T[]>)`? ########## accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator +{ + static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult + { + final AbstractFetchCoordinator coordinator; + + FetchResult(AbstractFetchCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public void abort(Ranges abort) + { + coordinator.abort(abort); + } + } + + static class Key + { + final Node.Id id; + final Ranges ranges; + + Key(Node.Id id, Ranges ranges) + { + this.id = id; + this.ranges = ranges; + } + + @Override + public int hashCode() + { + return id.hashCode() + ranges.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) return true; + if (!(obj instanceof Key)) return false; + Key that = (Key) obj; + return id.equals(that.id) && ranges.equals(that.ranges); + } + } + + final DataStore.FetchRanges fetchRanges; + final CommandStore commandStore; + final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>(); + final FetchResult result = new FetchResult(this); + final List<AsyncResult<Void>> persisting = new ArrayList<>(); + + protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore) + { + super(node, ranges, syncPoint, fetchRanges); + this.fetchRanges = fetchRanges; + this.commandStore = commandStore; + } + + protected abstract PartialTxn rangeReadTxn(Ranges ranges); + + protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges); + + @Override + public void contact(Node.Id to, Ranges ranges) + { + Key key = new Key(to, ranges); + inflight.put(key, starting(to, ranges)); + Ranges ownedRanges = ownedRangesForNode(to); + Invariants.checkArgument(ownedRanges.containsAll(ranges)); + PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); + node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>() + { + @Override + public void onSuccess(Node.Id from, ReadData.ReadReply reply) + { + if (!reply.isOk()) + { + fail(to, new RuntimeException(reply.toString())); + inflight.remove(key).cancel(); + switch ((ReadData.ReadNack) reply) + { + default: throw new AssertionError("Unhandled enum"); + case Invalid: + case Redundant: + case NotCommitted: + throw new AssertionError(); + case Error: + // TODO (required): ensure errors are propagated to coordinators and can be logged + } + return; + } + + FetchResponse ok = (FetchResponse) reply; + Ranges received; + if (ok.unavailable != null) + { + unavailable(to, ok.unavailable); + if (ok.data == null) + { + inflight.remove(key).cancel(); + return; + } + received = ranges.difference(ok.unavailable); + } + else + { + received = ranges; + } + + // TODO (now): make sure it works if invoked in either order + inflight.remove(key).started(ok.maxApplied); + onReadOk(to, commandStore, ok.data, received); + // received must be invoked after submitting the persistence future, as it triggers onDone + // which creates a ReducingFuture over {@code persisting} + } + + @Override + public void onFailure(Node.Id from, Throwable failure) + { + inflight.remove(key).cancel(); + fail(from, failure); + } + + @Override + public void onCallbackFailure(Node.Id from, Throwable failure) + { + // TODO (soon) + failure.printStackTrace(); + } + }); + } + + @Override + protected synchronized void success(Node.Id to, Ranges ranges) + { + super.success(to, ranges); + } + + @Override + protected synchronized void fail(Node.Id to, Ranges ranges, Throwable failure) + { + super.fail(to, ranges, failure); + } + + public FetchResult result() + { + return result; + } + + @Override + protected void onDone(Ranges success, Throwable failure) + { + if (success.isEmpty()) result.setFailure(failure); + else if (persisting.isEmpty()) result.setSuccess(null); + else AsyncChains.reduce(persisting, (a, b)-> null) + .begin((s, f) -> { + if (f == null) result.setSuccess(ranges); + else result.setFailure(f); + }); + } + + @Override + public void start() + { + super.start(); + } + + void abort(Ranges abort) + { + // TODO (required, later): implement abort + } + + public static class FetchRequest extends WaitAndReadData + { + public final PartialDeps partialDeps; + private transient Timestamp maxApplied; + + public FetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn) + { + super(ranges, sourceEpoch, Status.Applied, partialDeps, Timestamp.MAX, syncId, partialTxn); + this.partialDeps = partialDeps; + } + + @Override + protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable) + { + commandStore.execute(PreLoadContext.empty(), safeStore -> { Review Comment: know this is copy/paste, but we have `accord.local.AgentExecutor#execute` now, so could avoid the `PreLoadContext.empty()` and `.begin(node.agent());` boiler plate ########## accord-core/src/test/java/accord/messages/PreAcceptTest.java: ########## @@ -196,7 +196,8 @@ void multiKeyTimestampUpdate() throws ExecutionException messageSink.assertHistorySizes(0, 1); Assertions.assertEquals(ID3, messageSink.responses.get(0).to); PartialDeps expectedDeps = new PartialDeps(Ranges.of(range(0, 12)), KeyDeps.NONE, RangeDeps.NONE); - Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, Timestamp.fromValues(1, 110, ID1), expectedDeps), + Timestamp expectedTs = Timestamp.fromValues(1, 110, ID1).withExtraFlags(txnId2.flags()); Review Comment: what change caused this? I get the following locally (only changed this test, nothing else) ``` Expected :PreAcceptOk{txnId:[1,50,2,3], witnessedAt:[1,110,2,1], deps:[Range(0, 12]]:{}, {}} Actual :PreAcceptOk{txnId:[1,50,2,3], witnessedAt:[1,110,0,1], deps:[Range(0, 12]]:{}, {}} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

