wernerdv commented on code in PR #12508:
URL: https://github.com/apache/ignite/pull/12508#discussion_r2533245812
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java:
##########
@@ -396,98 +445,104 @@ public void rebalanced(boolean rebalanced) {
flags = rebalanced ? (byte)(flags | REBALANCED_FLAG_MASK) :
(byte)(flags & ~REBALANCED_FLAG_MASK);
}
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
- (!F.isEmpty(errs) && errsBytes == null);
-
- if (marshal) {
- // Reserve at least 2 threads for system operations.
- int parallelismLvl = U.availableThreadCount(ctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2);
+ /**
+ * @return Topology version.
+ */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
- Collection<Object> objectsToMarshall = new ArrayList<>();
+ /**
+ * @param topVer Topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
- if (!F.isEmpty(parts) && partsBytes == null)
- objectsToMarshall.add(parts);
+ /**
+ * @return Duplicated partitions data.
+ */
+ public Map<Integer, Integer> duplicatedPartitionsData() {
+ return dupPartsData;
+ }
- if (!F.isEmpty(errs) && errsBytes == null)
- objectsToMarshall.add(errs);
+ /**
+ * @param dupPartsData Duplicated partitions data.
+ */
+ public void duplicatedPartitionsData(Map<Integer, Integer> dupPartsData) {
+ this.dupPartsData = dupPartsData;
+ }
- Collection<byte[]> marshalled = U.doInParallel(
- parallelismLvl,
- ctx.kernalContext().pools().getSystemExecutorService(),
- objectsToMarshall,
- new IgniteThrowableFunction<Object, byte[]>() {
- @Override public byte[] apply(Object payload) throws
IgniteCheckedException {
- byte[] marshalled = U.marshal(ctx, payload);
+ /**
+ * @return Partitions update counters.
+ */
+ public IgniteDhtPartitionCountersMap partitionCounters() {
+ return partCntrs;
+ }
- if (compressed())
- marshalled = U.zip(marshalled,
ctx.gridConfig().getNetworkCompressionLevel());
+ /**
+ * @param partCntrs Partitions update counters.
+ */
+ public void partitionCounters(IgniteDhtPartitionCountersMap partCntrs) {
+ this.partCntrs = partCntrs;
+ }
- return marshalled;
- }
- });
+ /**
+ * @return Partitions that must be cleared and re-loaded.
+ */
+ public IgniteDhtPartitionsToReloadMap partitionsToReload() {
+ return partsToReload;
+ }
- Iterator<byte[]> iter = marshalled.iterator();
+ /**
+ * @param partsToReload Partitions that must be cleared and re-loaded.
+ */
+ public void partitionsToReload(IgniteDhtPartitionsToReloadMap
partsToReload) {
+ this.partsToReload = partsToReload;
+ }
- if (!F.isEmpty(parts) && partsBytes == null)
- partsBytes = iter.next();
+ /**
+ * @return Rebalanced flags.
+ */
+ public byte rebalancedFlags() {
+ return flags;
+ }
- if (!F.isEmpty(errs) && errsBytes == null)
- errsBytes = iter.next();
- }
+ /**
+ * @param flags Rebalanced flags.
+ */
+ public void rebalancedFlags(byte flags) {
+ this.flags = flags;
}
/**
- * @return Topology version.
+ * @return Lost partitions.
*/
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
+ public Map<Integer, int[]> lostPartitions() {
+ return lostParts;
}
/**
- * @param topVer Topology version.
+ * @param lostParts Lost partitions.
*/
- public void topologyVersion(AffinityTopologyVersion topVer) {
- this.topVer = topVer;
+ public void lostPartitions(Map<Integer, int[]> lostParts) {
+ this.lostParts = lostParts;
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig());
-
- Collection<byte[]> objectsToUnmarshall = new ArrayList<>();
-
- // Reserve at least 2 threads for system operations.
- int parallelismLvl = U.availableThreadCount(ctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2);
-
- if (partsBytes != null && parts == null)
- objectsToUnmarshall.add(partsBytes);
-
- if (errsBytes != null && errs == null)
- objectsToUnmarshall.add(errsBytes);
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
- Collection<Object> unmarshalled = U.doInParallel(
- parallelismLvl,
- ctx.kernalContext().pools().getSystemExecutorService(),
- objectsToUnmarshall,
- new IgniteThrowableFunction<byte[], Object>() {
- @Override public Object apply(byte[] binary) throws
IgniteCheckedException {
- return compressed()
- ? U.unmarshalZip(ctx.marshaller(), binary, clsLdr)
- : U.unmarshal(ctx, binary, clsLdr);
- }
- }
- );
+ if (!F.isEmpty(parts) && sendParts == null)
+ sendParts = new HashMap<>(parts);
+ }
- Iterator<Object> iter = unmarshalled.iterator();
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null && parts == null) {
- parts = (Map<Integer, GridDhtPartitionFullMap>)iter.next();
+ if (sendParts != null && parts == null) {
+ parts = sendParts;
Review Comment:
Outdated
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java:
##########
@@ -25,27 +25,43 @@
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.NotNull;
/**
* Full partition map from all nodes.
*/
public class GridDhtPartitionFullMap
- extends HashMap<UUID, GridDhtPartitionMap> implements
Comparable<GridDhtPartitionFullMap>, Externalizable {
+ extends HashMap<UUID, GridDhtPartitionMap> implements
Comparable<GridDhtPartitionFullMap>, Externalizable, Message {
Review Comment:
Outdated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]