dcapwell commented on code in PR #3777:
URL: https://github.com/apache/cassandra/pull/3777#discussion_r1915642875
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1245,9 +1280,34 @@ public static void mutateWithTriggers(List<? extends
IMutation> mutations,
public static void dispatchMutationsWithRetryOnDifferentSystem(List<?
extends IMutation> mutations, ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
{
+ Epoch lastEpoch = null;
while (true)
{
+ if (lastEpoch != null)
+ {
+ long timeout = requestTime.computeTimeout(nanoTime(),
DatabaseDescriptor.getWriteRpcTimeout(NANOSECONDS));
+ Epoch lastEpochFinal = lastEpoch;
+
ClusterMetadataService.instance().log().highestPending().ifPresent(epoch ->
+ {
+ try
+ {
+
ClusterMetadataService.instance().awaitAtLeast(epoch, timeout, NANOSECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
Review Comment:
```suggestion
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e);
```
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -366,9 +374,36 @@ public static RowIterator cas(String keyspaceName,
}
ConsensusAttemptResult lastAttemptResult;
+ Epoch lastEpoch = null;
do
{
+ if (lastEpoch != null)
+ {
+
+ long timeout = requestTime.computeTimeout(nanoTime(),
DatabaseDescriptor.getTransactionTimeout(NANOSECONDS));
+ Epoch lastEpochFinal = lastEpoch;
+
ClusterMetadataService.instance().log().highestPending().ifPresent(epoch ->
+ {
+ try
+ {
+
ClusterMetadataService.instance().awaitAtLeast(epoch, timeout, NANOSECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
Review Comment:
```suggestion
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e);
```
##########
src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java:
##########
@@ -324,39 +337,130 @@ List<TxnWrite.Fragment> createWriteFragments(ClientState
state, QueryOptions opt
return fragments;
}
- AccordUpdate createUpdate(ClientState state, QueryOptions options,
Map<Integer, NamedSelect> autoReads, Consumer<Key> keyConsumer)
+ AccordUpdate createUpdate(ClusterMetadata cm, ClientState state,
QueryOptions options, Map<Integer, NamedSelect> autoReads, Set<Key> keys)
{
- return new TxnUpdate(createWriteFragments(state, options, autoReads,
keyConsumer), createCondition(options), null, false);
+ checkArgument(keys.isEmpty(), "Construct update before reads so the
key set can be used to determine commit consistency level");
+ List<TxnWrite.Fragment> writeFragments = createWriteFragments(state,
options, autoReads, keys);
+ ConsistencyLevel commitCL = consistencyLevelForAccordCommit(cm, keys,
options.getConsistency());
+ return new TxnUpdate(writeFragments, createCondition(options),
commitCL, false);
}
Keys toKeys(SortedSet<Key> keySet)
{
return new Keys(keySet);
}
+ private static TransactionalMode transactionalModeForSingleKey(Keys keys)
+ {
+ return Schema.instance.getTableMetadata(((AccordRoutableKey)
keys.get(0)).table()).params.transactionalMode;
+ }
+
+ private ConsistencyLevel consistencyLevelForAccordRead(ClusterMetadata cm,
Set<Key> keys, ConsistencyLevel consistencyLevel)
+ {
+ // Write transactions are read/write so it creates a read and ends up
needing a consistency level
Review Comment:
don't we use a empty read?
```
@Override
public Txn emptySystemTxn(Kind kind, Routable.Domain domain)
{
return new Txn.InMemory(kind, domain == Key ? Keys.EMPTY :
Ranges.EMPTY, TxnRead.EMPTY, TxnQuery.UNSAFE_EMPTY, null);
}
```
`TxnRead.EMPTY,` would imply write only
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1503,13 +1610,31 @@ public static void mutateAtomically(List<Mutation>
mutations,
if (accordResult != null)
{
IAccordService accord = AccordService.instance();
- TxnResult.Kind kind =
accord.getTxnResult(accordResult, true, consistencyLevel, requestTime).kind();
+ TxnResult.Kind kind =
accord.getTxnResult(accordResult).kind();
if (kind == retry_new_protocol && failure == null)
continue;
Tracing.trace("Successfully wrote Accord mutations");
cleanup.ackMutation();
}
}
+ catch (TopologyMismatch e)
+ {
+ // Don't suppress existing failure
+ if (failure == null)
+ {
+ // For now assuming topology mismatch is caused by a
race misrouting
+ Tracing.trace("Accord returned topology mismatch,
retrying: " + e.getMessage());
+ logger.debug("Accord returned topology mismatch,
retrying: " + e.getMessage());
+ continue;
Review Comment:
stated above. This event isn't a safe one to retry on, this can lead to an
infinite loop
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1395,11 +1477,36 @@ public static void mutateAtomically(List<Mutation>
mutations,
ReplicaPlan.ForWrite batchlogReplicaPlan =
ReplicaPlans.forBatchlogWrite(ClusterMetadata.current(), batchConsistencyLevel
== ConsistencyLevel.ANY);
final TimeUUID batchUUID = nextTimeUUID();
boolean wroteToBatchLog = false;
+ Epoch lastEpoch = null;
while (true)
{
+ if (lastEpoch != null)
Review Comment:
this logic is very much copy/paste, can we refactor into a reusable
function? the only difference is a on-timeout throw handler
```
awaitPending(() -> {
doFallibleWriteWithMetricTracking(() -> {throw new
WriteTimeoutException(WriteType.BATCH, consistencyLevel, 0, 0, "Timed out
waiting for updated cluster metadata");}, consistencyLevel);
});
```
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1293,12 +1353,34 @@ public static void
dispatchMutationsWithRetryOnDifferentSystem(List<? extends IM
if (accordResult != null)
{
IAccordService accord = AccordService.instance();
- TxnResult.Kind kind =
accord.getTxnResult(accordResult, true, consistencyLevel, requestTime).kind();
- if (kind == retry_new_protocol)
+ TxnResult.Kind kind =
accord.getTxnResult(accordResult).kind();
+ if (kind == retry_new_protocol && failure == null)
+ {
+ Tracing.trace("Accord returned retry new
protocol");
+ logger.debug("Retrying mutations on different
system because some mutations were misrouted according to Accord");
continue;
+ }
Tracing.trace("Successfully wrote Accord mutations");
}
}
+ catch (TopologyMismatch e)
+ {
+ // Don't suppress existing failure
+ if (failure == null)
+ {
+ // For now assuming topology mismatch is caused by a
race misrouting
+ Tracing.trace("Accord returned topology mismatch,
retrying: " + e.getMessage());
+ logger.debug("Accord returned topology mismatch,
retrying: " + e.getMessage());
+ continue;
Review Comment:
this feels very dangerous without a retry limit....
```
insert into tbl ... IF NOT EXISTS;
drop table tbl;
```
this causes a race condition where we infinite loop blocking this thread for
life....
`TopologyMismatch` was never expected to be a retryable event, it was a
die-in-a-fire event.
##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -1395,11 +1477,36 @@ public static void mutateAtomically(List<Mutation>
mutations,
ReplicaPlan.ForWrite batchlogReplicaPlan =
ReplicaPlans.forBatchlogWrite(ClusterMetadata.current(), batchConsistencyLevel
== ConsistencyLevel.ANY);
final TimeUUID batchUUID = nextTimeUUID();
boolean wroteToBatchLog = false;
+ Epoch lastEpoch = null;
while (true)
{
+ if (lastEpoch != null)
+ {
+ long timeout = requestTime.computeTimeout(nanoTime(),
DatabaseDescriptor.getNativeTransportTimeout(NANOSECONDS));
+ Epoch lastEpochFinal = lastEpoch;
+
ClusterMetadataService.instance().log().highestPending().ifPresent(epoch ->
+ {
+ try
+ {
+
ClusterMetadataService.instance().awaitAtLeast(epoch, timeout, NANOSECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
Review Comment:
same as above, not properly handling interrupt.
##########
src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java:
##########
@@ -320,6 +330,17 @@ public UnfilteredRowIterator
applyToPartition(UnfilteredRowIterator partition)
return Transformation.apply(iterator, new Logging());
}
+ public static void log(UnfilteredPartitionIterator partitions, String id,
boolean fullDetails)
Review Comment:
and why do we care?
```
public class AccordInteropRead extends ReadData
{
private static final boolean LOG_READ_RESULTS = false;
```
this is only done if that is `true` which is blocked... there are no docs so
its one of those things that is likely to get dropped over time as its dead
code (and our style guide is explicit about blocking dead code). The api is
dangerous to call outside of tests and we now have to have double read logic
in prod code that we hope never gets triggered...
##########
src/java/org/apache/cassandra/dht/NormalizedRanges.java:
##########
@@ -201,8 +204,7 @@ else if (!bRMin && aRange.left.compareTo(bRange.right) >= 0)
@VisibleForTesting
public NormalizedRanges<T> invert()
{
- if (isEmpty())
- return this;
+ checkState(!isEmpty());
Review Comment:
im ok with lowering the visibility... but can you also explain this
assumption in the code as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]