nizhikov commented on code in PR #9907:
URL: https://github.com/apache/ignite/pull/9907#discussion_r875722071
##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java:
##########
@@ -237,429 +256,74 @@ else if (e.irreparableKeys().contains(key) ||
(e.repairableKeys() != null && e.r
ClusterNode node = evtEntryInfo.getKey();
CacheConsistencyViolationEvent.EntryInfo info =
evtEntryInfo.getValue();
+ Object val = info.getValue();
+
if (info.isCorrect())
- assertEquals(fixed, info.getValue());
+ assertEquals(repairedBin, val);
if (info.isPrimary()) {
- assertEquals(primary, info.getValue());
+ assertEquals(primary, val);
assertEquals(node, primaryNode(key,
DEFAULT_CACHE_NAME).cluster().localNode());
}
}
}
- int expectedFixedCnt = inconsistent.size() -
- (e != null ? (e.repairableKeys() != null ?
e.repairableKeys().size() : 0) + e.irreparableKeys().size() : 0);
-
- assertEquals(expectedFixedCnt, evtFixed.size());
-
- assertTrue(evtDeq.isEmpty());
- }
-
- /**
- *
- */
- protected void prepareAndCheck(
- Ignite initiator,
- Integer cnt,
- boolean raw,
- boolean async,
- boolean misses,
- boolean nulls,
- Consumer<ReadRepairData> c)
- throws Exception {
- IgniteCache<Integer, Integer> cache =
initiator.getOrCreateCache(DEFAULT_CACHE_NAME);
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- for (int i = 0; i < rnd.nextInt(1, 10); i++) {
- ReadRepairStrategy[] strategies = ReadRepairStrategy.values();
-
- ReadRepairStrategy strategy =
strategies[rnd.nextInt(strategies.length)];
-
- Map<Integer, InconsistentMapping> results = new TreeMap<>(); //
Sorted to avoid warning.
-
- try {
- for (int j = 0; j < cnt; j++) {
- int curKey = iterableKey.incrementAndGet();
-
- InconsistentMapping res =
setDifferentValuesForSameKey(curKey, misses, nulls, strategy);
-
- results.put(curKey, res);
- }
-
- for (Ignite node : G.allGrids()) { // Check that cache filled
properly.
- Map<Integer, Integer> all =
- node.<Integer,
Integer>getOrCreateCache(DEFAULT_CACHE_NAME).getAll(results.keySet());
-
- for (Map.Entry<Integer, Integer> entry : all.entrySet()) {
- Integer key = entry.getKey();
- Integer val = entry.getValue();
-
- T2<Integer, GridCacheVersion> valVer =
results.get(key).mapping.get(node);
-
- Integer exp;
-
- if (valVer != null)
- exp = valVer.get1(); // Should read from itself
(backup or primary).
- else
- exp = results.get(key).primary; // Or read from
primary (when not a partition owner).
-
- assertEquals(exp, val);
- }
- }
-
- c.accept(new ReadRepairData(cache, results, raw, async,
strategy));
- }
- catch (Throwable th) {
- StringBuilder sb = new StringBuilder();
-
- sb.append("Read Repair test failed [")
- .append("cache=").append(cache.getName())
- .append(", strategy=").append(strategy)
- .append("]\n");
-
- for (Map.Entry<Integer, InconsistentMapping> entry :
results.entrySet()) {
- sb.append("Key: ").append(entry.getKey()).append("\n");
-
- InconsistentMapping mapping = entry.getValue();
-
- sb.append(" Random data [primary=").append(mapping.primary)
- .append(", fixed=").append(mapping.fixed)
- .append(", repairable=").append(mapping.repairable)
- .append(", consistent=").append(mapping.consistent)
- .append("]\n");
-
- sb.append(" Distribution: \n");
-
- for (Map.Entry<Ignite, T2<Integer, GridCacheVersion>> dist
: mapping.mapping.entrySet()) {
- sb.append(" Node:
").append(dist.getKey().name()).append("\n");
- sb.append(" Value:
").append(dist.getValue().get1()).append("\n");
- sb.append(" Version:
").append(dist.getValue().get2()).append("\n");
- }
-
- sb.append("\n");
- }
-
- throw new Exception(sb.toString(), th);
- }
- }
- }
-
- /**
- *
- */
- private InconsistentMapping setDifferentValuesForSameKey(int key, boolean
misses, boolean nulls,
- ReadRepairStrategy strategy) throws Exception {
- List<Ignite> nodes = new ArrayList<>();
- Map<Ignite, T2<Integer, GridCacheVersion>> mapping = new HashMap<>();
-
- Ignite primary = primaryNode(key, DEFAULT_CACHE_NAME);
-
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
- if (rnd.nextBoolean()) { // Reversed order.
- nodes.addAll(backupNodes(key, DEFAULT_CACHE_NAME));
- nodes.add(primary);
- }
- else {
- nodes.add(primary);
- nodes.addAll(backupNodes(key, DEFAULT_CACHE_NAME));
- }
-
- if (rnd.nextBoolean()) // Random order.
- Collections.shuffle(nodes);
-
- IgniteInternalCache<Integer, Integer> internalCache =
(grid(1)).cachex(DEFAULT_CACHE_NAME);
-
- GridCacheVersionManager mgr =
((GridCacheAdapter)internalCache.cache()).context().shared().versions();
-
- int incVal = 0;
- Integer primVal = null;
- Collection<T2<Integer, GridCacheVersion>> vals = new ArrayList<>();
-
- if (misses) {
- List<Ignite> keeped = nodes.subList(0, rnd.nextInt(1,
nodes.size()));
-
- nodes.stream()
- .filter(node -> !keeped.contains(node))
- .forEach(node -> {
- T2<Integer, GridCacheVersion> nullT2 = new T2<>(null,
null);
-
- vals.add(nullT2);
- mapping.put(node, nullT2);
- }); // Recording nulls (missed values).
-
- nodes = keeped;
- }
-
- boolean rmvd = false;
-
- boolean incVer = rnd.nextBoolean();
-
- GridCacheVersion ver = null;
-
- for (Ignite node : nodes) {
- IgniteInternalCache<Integer, Integer> cache =
((IgniteEx)node).cachex(DEFAULT_CACHE_NAME);
-
- GridCacheAdapter<Integer, Integer> adapter =
(GridCacheAdapter)cache.cache();
-
- GridCacheEntryEx entry = adapter.entryEx(key);
-
- if (ver == null || incVer)
- ver =
mgr.next(entry.context().kernalContext().discovery().topologyVersion()); //
Incremental version.
-
- boolean rmv = nulls && (!rmvd || rnd.nextBoolean());
-
- Integer val = rmv ? null : rnd.nextBoolean()/*increment or same as
previously*/ ? ++incVal : incVal;
-
- T2<Integer, GridCacheVersion> valVer = new T2<>(val, val != null ?
ver : null);
-
- vals.add(valVer);
- mapping.put(node, valVer);
-
- GridKernalContext kctx = ((IgniteEx)node).context();
-
- byte[] bytes =
kctx.cacheObjects().marshal(entry.context().cacheObjectContext(), rmv ? -1 :
val); // Incremental value.
-
- try {
- kctx.cache().context().database().checkpointReadLock();
-
- boolean init = entry.initialValue(
- new CacheObjectImpl(null, bytes),
- ver,
- 0,
- 0,
- false,
- AffinityTopologyVersion.NONE,
- GridDrType.DR_NONE,
- false,
- false);
-
- if (rmv) {
- if (cache.configuration().getAtomicityMode() == ATOMIC)
- entry.innerUpdate(
- ver,
- ((IgniteEx)node).localNode().id(),
- ((IgniteEx)node).localNode().id(),
- GridCacheOperation.DELETE,
- null,
- null,
- false,
- false,
- false,
- false,
- null,
- false,
- false,
- false,
- false,
- AffinityTopologyVersion.NONE,
- null,
- GridDrType.DR_NONE,
- 0,
- 0,
- null,
- false,
- false,
- null,
- null,
- null,
- null,
- false);
- else
- entry.innerRemove(
- null,
- ((IgniteEx)node).localNode().id(),
- ((IgniteEx)node).localNode().id(),
- false,
- false,
- false,
- false,
- false,
- null,
- AffinityTopologyVersion.NONE,
- CU.empty0(),
- GridDrType.DR_NONE,
- null,
- null,
- null,
- 1L);
-
- rmvd = true;
-
- assertFalse(entry.hasValue());
- }
- else
- assertTrue(entry.hasValue());
-
- assertTrue("iterableKey " + key + " already inited", init);
-
- if (node.equals(primary))
- primVal = val;
- }
- finally {
-
((IgniteEx)node).context().cache().context().database().checkpointReadUnlock();
- }
- }
-
- assertEquals(vals.size(), mapping.size());
- assertEquals(vals.size(),
- internalCache.configuration().getCacheMode() == REPLICATED ?
serverNodesCount() : backupsCount() + 1);
-
- if (!misses && !nulls)
- assertTrue(primVal != null); // Primary value set.
+ int irreparableCnt = 0;
+ int repairableCnt = 0;
- Integer fixed;
-
- boolean consistent;
- boolean repairable;
-
- if (vals.stream().distinct().count() == 1) { // Consistent value.
- consistent = true;
- repairable = true;
- fixed = vals.iterator().next().getKey();
+ if (e != null) {
+ irreparableCnt = e.irreparableKeys().size();
+ repairableCnt = e.repairableKeys() != null ?
e.repairableKeys().size() : 0;
}
- else {
- consistent = false;
- repairable = atomicityMode() != ATOMIC; // Currently, Atomic
caches can not be repaired.
-
- switch (strategy) {
- case LWW:
- if (misses || rmvd || !incVer) {
- repairable = false;
- fixed = Integer.MIN_VALUE; // Should never be returned.
- }
- else
- fixed = incVal;
-
- break;
-
- case PRIMARY:
- fixed = primVal;
-
- break;
-
- case RELATIVE_MAJORITY:
- fixed = Integer.MIN_VALUE; // Should never be returned.
-
- Map<T2<Integer, GridCacheVersion>, Integer> counts = new
HashMap<>();
-
- for (T2<Integer, GridCacheVersion> val : vals) {
- counts.putIfAbsent(val, 0);
-
- counts.compute(val, (k, v) -> v + 1);
- }
-
- int[] sorted =
counts.values().stream().sorted(Comparator.reverseOrder()).mapToInt(v ->
v).toArray();
-
- int max = sorted[0];
-
- if (sorted.length > 1 && sorted[1] == max)
- repairable = false;
-
- if (repairable)
- for (Map.Entry<T2<Integer, GridCacheVersion>, Integer>
count : counts.entrySet())
- if (count.getValue().equals(max)) {
- fixed = count.getKey().getKey();
-
- break;
- }
-
- break;
+ if (repairableCnt > 0)
+ // Mentioned when pessimistic tx read-repair get contains
irreparable entries,
+ // and it's impossible to repair repairable entries during this
call.
+ assertEquals(TRANSACTIONAL, atomicityMode());
- case REMOVE:
- fixed = null;
+ int expectedRepairedCnt = inconsistent.size() - (irreparableCnt +
repairableCnt);
- break;
+ assertEquals(expectedRepairedCnt, evtRepaired.size());
- case CHECK_ONLY:
- repairable = false;
-
- fixed = Integer.MIN_VALUE; // Should never be returned.
-
- break;
-
- default:
- throw new
UnsupportedOperationException(strategy.toString());
- }
- }
-
- return new InconsistentMapping(mapping, primVal, fixed, repairable,
consistent);
+ assertTrue(evtDeq.isEmpty());
}
/**
- *
+ * @param binary With keep binary.
+ * @param obj Object.
*/
- protected static final class ReadRepairData {
- /** Initiator's cache. */
- final IgniteCache<Integer, Integer> cache;
-
- /** Generated data across topology per key mapping. */
- final Map<Integer, InconsistentMapping> data;
-
- /** Raw read flag. True means required GetEntry() instead of get(). */
- final boolean raw;
-
- /** Async read flag. */
- final boolean async;
-
- /** Strategy. */
- final ReadRepairStrategy strategy;
-
- /**
- *
- */
- public ReadRepairData(
- IgniteCache<Integer, Integer> cache,
- Map<Integer, InconsistentMapping> data,
- boolean raw,
- boolean async,
- ReadRepairStrategy strategy) {
- this.cache = cache;
- this.data = data;
- this.raw = raw;
- this.async = async;
- this.strategy = strategy;
- }
+ protected static Object unwrapBinaryIfNeeded(boolean binary, Object obj) {
+ if (binary && obj instanceof BinaryObject /*Integer is still Integer
at withKeepBinary read*/) {
+ BinaryObject valObj = (BinaryObject)obj;
- /**
- *
- */
- boolean repairable() {
- return data.values().stream().allMatch(mapping ->
mapping.repairable);
+ return valObj != null ? valObj.deserialize() : null;
}
+ else
+ return obj;
}
/**
*
*/
- protected static final class InconsistentMapping {
- /** Value per node. */
- final Map<Ignite, T2<Integer, GridCacheVersion>> mapping;
-
- /** Primary node's value. */
- final Integer primary;
-
- /** Expected fix result. */
- final Integer fixed;
-
- /** Inconsistency can be repaired using the specified strategy. */
- final boolean repairable;
-
- /** Consistent value. */
- final boolean consistent;
-
- /**
- *
- */
- public InconsistentMapping(Map<Ignite, T2<Integer, GridCacheVersion>>
mapping, Integer primary, Integer fixed,
- boolean repairable, boolean consistent) {
- this.mapping = new HashMap<>(mapping);
- this.primary = primary;
- this.fixed = fixed;
- this.repairable = repairable;
- this.consistent = consistent;
- }
+ protected void generateAndCheck(
+ Ignite initiator,
+ int cnt,
+ boolean raw,
+ boolean async,
+ boolean misses,
+ boolean nulls,
+ boolean binary,
+ ReadRepairStrategy strategy,
Review Comment:
strategy is always null and can be omitted.
Can we create special `generate` method for the cases when `strategy` is
null?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]