petrov-mg commented on code in PR #13275:
URL: https://github.com/apache/ignite/pull/13275#discussion_r3475787321
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
* {@link OperationContextAttribute} instance that is consistent across all
cluster nodes.</p>
*
* <p>To enable propagation of an {@link OperationContextAttribute} value
across cluster nodes, the
- * attribute must be created using the {@link
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
Review Comment:
`must be created using` -> `must be registered via`
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
* {@link OperationContextAttribute} instance that is consistent across all
cluster nodes.</p>
*
* <p>To enable propagation of an {@link OperationContextAttribute} value
across cluster nodes, the
- * attribute must be created using the {@link
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
*
* <p> Note, that the maximum number of distributed attribute instances that
can be created is currently limited to
* {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
*
* @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
*/
-public class DistributedOperationContextManager {
- /** */
- private static final DistributedOperationContextManager INSTANCE = new
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
/** Maximal number of supported distributed attributes. */
static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
/** Registered distributed attributes by their cluster-wide id. */
- private final Map<Byte, OperationContextAttribute<Message>> attrs = new
ConcurrentSkipListMap<>();
+ private final Map<Byte, OperationContextAttribute<? extends Message>>
attrs = new ConcurrentSkipListMap<>();
- /** */
- public static DistributedOperationContextManager instance() {
- return INSTANCE;
- }
+ /** The initialization flag. */
Review Comment:
`Whether the registration of new distributed attributes is allowed.`
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
* {@link OperationContextAttribute} instance that is consistent across all
cluster nodes.</p>
*
* <p>To enable propagation of an {@link OperationContextAttribute} value
across cluster nodes, the
- * attribute must be created using the {@link
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
*
* <p> Note, that the maximum number of distributed attribute instances that
can be created is currently limited to
* {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
*
* @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
*/
-public class DistributedOperationContextManager {
- /** */
- private static final DistributedOperationContextManager INSTANCE = new
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
/** Maximal number of supported distributed attributes. */
static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
/** Registered distributed attributes by their cluster-wide id. */
- private final Map<Byte, OperationContextAttribute<Message>> attrs = new
ConcurrentSkipListMap<>();
+ private final Map<Byte, OperationContextAttribute<? extends Message>>
attrs = new ConcurrentSkipListMap<>();
- /** */
- public static DistributedOperationContextManager instance() {
- return INSTANCE;
- }
+ /** The initialization flag. */
+ private volatile boolean initialized;
/**
- * Creates a new {@link OperationContext} attribute with the specified
distributed ID and initial value.
+ * Registers an attribute of {@link OperationContext} with the specified
distributed ID.
*
* <p>The distributed ID is used to consistently identify the attribute
across all nodes in the cluster.
- * It must be unique, and its value must be in the range from {@code 0}
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
+ * It must be unique, and its value must be in the range [{@code 0} :
{@code Byte.SIZE}).</p>
*
- * <p>The value of the created attribute is automatically captured and
propagated between cluster nodes
+ * <p>A value of the attribute is automatically captured and propagated
between cluster nodes
* during message transmission.</p>
- *
- * @see OperationContextAttribute#newInstance(Object)
*/
- public <T extends Message> OperationContextAttribute<T>
createDistributedAttribute(byte id, @Nullable T initVal) {
- assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed
attributed id [id=" + id + ']';
+ public <T extends Message> void registerDistributedAttribute(byte id,
OperationContextAttribute<T> attr) {
+ if (initialized)
+ throw new IgniteException("Initialization of distributed operation
context attributes has already finished.");
- return (OperationContextAttribute<T>)attrs.compute(id, (id0, attr0) ->
{
- if (attr0 != null)
- throw new IgniteException("Duplicated distributed attribute id
[id=" + id + ']');
+ assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed
attributed id [id=" + id + ']';
- return OperationContextAttribute.newInstance(initVal);
- });
+ if (attrs.putIfAbsent(id, attr) != null)
+ throw new IgniteException("Duplicated distributed attribute id
[id=" + id + ']');
}
/**
- * Collects the values of all distributed {@link
OperationContextAttribute}s registered by this manager in a format
- * suitable for transmission between cluster nodes.
+ * Collects the values of all distributed {@link
OperationContextAttribute}s registered by this manager.
Review Comment:
`registered by this manager` -> `registered by this dispatcher`
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
* {@link OperationContextAttribute} instance that is consistent across all
cluster nodes.</p>
*
* <p>To enable propagation of an {@link OperationContextAttribute} value
across cluster nodes, the
- * attribute must be created using the {@link
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
*
* <p> Note, that the maximum number of distributed attribute instances that
can be created is currently limited to
* {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
*
* @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
*/
-public class DistributedOperationContextManager {
- /** */
- private static final DistributedOperationContextManager INSTANCE = new
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
/** Maximal number of supported distributed attributes. */
static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
/** Registered distributed attributes by their cluster-wide id. */
- private final Map<Byte, OperationContextAttribute<Message>> attrs = new
ConcurrentSkipListMap<>();
+ private final Map<Byte, OperationContextAttribute<? extends Message>>
attrs = new ConcurrentSkipListMap<>();
- /** */
- public static DistributedOperationContextManager instance() {
- return INSTANCE;
- }
+ /** The initialization flag. */
+ private volatile boolean initialized;
/**
- * Creates a new {@link OperationContext} attribute with the specified
distributed ID and initial value.
+ * Registers an attribute of {@link OperationContext} with the specified
distributed ID.
*
* <p>The distributed ID is used to consistently identify the attribute
across all nodes in the cluster.
- * It must be unique, and its value must be in the range from {@code 0}
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
+ * It must be unique, and its value must be in the range [{@code 0} :
{@code Byte.SIZE}).</p>
*
- * <p>The value of the created attribute is automatically captured and
propagated between cluster nodes
+ * <p>A value of the attribute is automatically captured and propagated
between cluster nodes
* during message transmission.</p>
- *
- * @see OperationContextAttribute#newInstance(Object)
*/
- public <T extends Message> OperationContextAttribute<T>
createDistributedAttribute(byte id, @Nullable T initVal) {
- assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed
attributed id [id=" + id + ']';
+ public <T extends Message> void registerDistributedAttribute(byte id,
OperationContextAttribute<T> attr) {
+ if (initialized)
+ throw new IgniteException("Initialization of distributed operation
context attributes has already finished.");
Review Comment:
Let's remove dot from the end of the error message.
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
* {@link OperationContextAttribute} instance that is consistent across all
cluster nodes.</p>
*
* <p>To enable propagation of an {@link OperationContextAttribute} value
across cluster nodes, the
- * attribute must be created using the {@link
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
*
* <p> Note, that the maximum number of distributed attribute instances that
can be created is currently limited to
* {@link #MAX_DISTRIBUTED_ATTR_CNT} for implementation reasons.</p>
*
* @see OperationContext
- * @see DistributedOperationContextMessage
+ * @see OperationContextMessage
*/
-public class DistributedOperationContextManager {
- /** */
- private static final DistributedOperationContextManager INSTANCE = new
DistributedOperationContextManager();
-
+public class OperationContextDispatcher {
/** Maximal number of supported distributed attributes. */
static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE;
/** Registered distributed attributes by their cluster-wide id. */
- private final Map<Byte, OperationContextAttribute<Message>> attrs = new
ConcurrentSkipListMap<>();
+ private final Map<Byte, OperationContextAttribute<? extends Message>>
attrs = new ConcurrentSkipListMap<>();
- /** */
- public static DistributedOperationContextManager instance() {
- return INSTANCE;
- }
+ /** The initialization flag. */
+ private volatile boolean initialized;
/**
- * Creates a new {@link OperationContext} attribute with the specified
distributed ID and initial value.
+ * Registers an attribute of {@link OperationContext} with the specified
distributed ID.
*
* <p>The distributed ID is used to consistently identify the attribute
across all nodes in the cluster.
- * It must be unique, and its value must be in the range from {@code 0}
(inclusive) to {@code Byte.SIZE} (exclusive).</p>
+ * It must be unique, and its value must be in the range [{@code 0} :
{@code Byte.SIZE}).</p>
*
- * <p>The value of the created attribute is automatically captured and
propagated between cluster nodes
+ * <p>A value of the attribute is automatically captured and propagated
between cluster nodes
Review Comment:
`A value of the attribute` -> `A value of the registered attribute`
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -37,69 +37,60 @@
* {@link OperationContextAttribute} instance that is consistent across all
cluster nodes.</p>
*
* <p>To enable propagation of an {@link OperationContextAttribute} value
across cluster nodes, the
- * attribute must be created using the {@link
#createDistributedAttribute(byte, Message)} method.
+ * attribute must be created using the {@link
#registerDistributedAttribute(byte, OperationContextAttribute)} method.
*
* <p> Note, that the maximum number of distributed attribute instances that
can be created is currently limited to
Review Comment:
`can be created` -> `can be registered`
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -146,8 +137,8 @@ public Scope restoreDistributedAttributes(@Nullable
DistributedOperationContextM
return updater.apply();
}
- /** For testing purposes mostly. */
- void clear() {
- attrs.clear();
+ /** Deprecated fuhrter filling of distributed attributes. */
Review Comment:
Restricts further registration of distributed attributes.
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/context/OperationContextDispatcher.java:
##########
@@ -146,8 +137,8 @@ public Scope restoreDistributedAttributes(@Nullable
DistributedOperationContextM
return updater.apply();
}
- /** For testing purposes mostly. */
- void clear() {
- attrs.clear();
+ /** Deprecated fuhrter filling of distributed attributes. */
+ public void initialized() {
Review Comment:
Let's rename it to something like `markInitialized`, `seal`, or
`finishRegistration`, and rename the associated variable accordingly.
##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -831,36 +847,74 @@ public void testContextAwareDelayQueue() throws Exception
{
/** */
@Test
public void testSendAttributesByDiscovery() throws Exception {
- byte attrId1 = 0;
- byte attrId2 =
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+ doTestOperationContextAttributesPropagation(true);
+ }
- InetSocketAddressMessage dfltDistAttr1Val = new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
- GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+ /** */
+ @Test
+ public void testSendAttributesByCommunication() throws Exception {
+ doTestOperationContextAttributesPropagation(false);
+ }
+
+ /** */
+ private void doTestOperationContextAttributesPropagation(boolean
discovery) throws Exception {
+ OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
+ OperationContextAttribute.newInstance(new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80));
+
+ OperationContextAttribute<GridCacheVersion> dAttr2 =
+ OperationContextAttribute.newInstance(new GridCacheVersion(1, 1,
1));
+
+ pluginProvider = new AbstractTestPluginProvider() {
Review Comment:
I suggest making this class static and declaring the attributes inside it
Something like:
```
/** */
static class TestIgniteComponent extends AbstractTestPluginProvider {
/** */
public static OperationContextAttribute<InetSocketAddressMessage>
ADDR_ATTR = OperationContextAttribute.newInstance(
new InetSocketAddressMessage(InetAddress.getLoopbackAddress(),
80));
/** */
public static OperationContextAttribute<GridCacheVersion>
CACHE_VER_ATTR = OperationContextAttribute.newInstance(
new GridCacheVersion(1, 1, 1));
/** {@inheritDoc} */
@Override public String name() {
return "TestDistributedOperationContextAttributesRegistrator";
}
/** {@inheritDoc} */
@Override public void start(PluginContext ctx) {
GridKernalContext kernalCtx = ((IgniteEx)ctx.grid()).context();
kernalCtx.operationContextDispatcher().registerDistributedAttribute((byte)0,
ADDR_ATTR);
kernalCtx.operationContextDispatcher().registerDistributedAttribute(
(byte)(OperationContextDispatcher.MAX_DISTRIBUTED_ATTR_CNT -
1),
CACHE_VER_ATTR
);
}
}
```
##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -872,15 +926,10 @@ public void testSendAttributesByDiscovery() throws
Exception {
InetSocketAddressMessage receivedVal1 =
OperationContext.get(dAttr1);
GridCacheVersion receivedVal2 =
OperationContext.get(dAttr2);
- assertNotNull(receivedVal1);
- assertNotNull(receivedVal2);
-
- assertFalse(dfltDistAttr1Val.port() ==
receivedVal1.port());
- assertEquals(receivedVal1.port(), valToSend1.port());
- assertEquals(receivedVal1.address(),
valToSend1.address());
+ assertTrue(receivedVal1 != null && valToSend1.port()
== receivedVal1.port());
+ assertTrue(receivedVal1 != null &&
valToSend1.address().equals(receivedVal1.address()));
- assertFalse(dfltDistrAttr2Val.equals(receivedVal2));
- assertTrue(valToSend2.equals(receivedVal2));
+ assertEquals(valToSend2, receivedVal2);
if (grid(i0).localNode().isClient())
Review Comment:
Here we can just use ` checkedNodes.add(grid(i0));`
##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -831,36 +847,74 @@ public void testContextAwareDelayQueue() throws Exception
{
/** */
@Test
public void testSendAttributesByDiscovery() throws Exception {
- byte attrId1 = 0;
- byte attrId2 =
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+ doTestOperationContextAttributesPropagation(true);
+ }
- InetSocketAddressMessage dfltDistAttr1Val = new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
- GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+ /** */
+ @Test
+ public void testSendAttributesByCommunication() throws Exception {
+ doTestOperationContextAttributesPropagation(false);
+ }
+
+ /** */
+ private void doTestOperationContextAttributesPropagation(boolean
discovery) throws Exception {
+ OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
+ OperationContextAttribute.newInstance(new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80));
+
+ OperationContextAttribute<GridCacheVersion> dAttr2 =
+ OperationContextAttribute.newInstance(new GridCacheVersion(1, 1,
1));
+
+ pluginProvider = new AbstractTestPluginProvider() {
+ @Override public String name() {
+ return "TestDistributedOperationContextAttributesRegistrator";
+ }
+
+ @Override public void start(PluginContext ctx) {
+
((IgniteEx)ctx.grid()).context().operationContextDispatcher().registerDistributedAttribute((byte)0,
dAttr1);
+
Review Comment:
Let's add a check to ensure that registering an attribute with the same
identifier fails.
##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
Review Comment:
Here and places below -
```
assertTrue(waitForCondition(() -> checkedNodes.containsAll(G.allGrids()),
getTestTimeout()));
checkedNodes.clear();
```
##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -831,36 +847,74 @@ public void testContextAwareDelayQueue() throws Exception
{
/** */
@Test
public void testSendAttributesByDiscovery() throws Exception {
- byte attrId1 = 0;
- byte attrId2 =
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+ doTestOperationContextAttributesPropagation(true);
+ }
- InetSocketAddressMessage dfltDistAttr1Val = new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
- GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+ /** */
+ @Test
+ public void testSendAttributesByCommunication() throws Exception {
+ doTestOperationContextAttributesPropagation(false);
+ }
+
+ /** */
+ private void doTestOperationContextAttributesPropagation(boolean
discovery) throws Exception {
+ OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
+ OperationContextAttribute.newInstance(new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80));
+
+ OperationContextAttribute<GridCacheVersion> dAttr2 =
+ OperationContextAttribute.newInstance(new GridCacheVersion(1, 1,
1));
+
+ pluginProvider = new AbstractTestPluginProvider() {
+ @Override public String name() {
+ return "TestDistributedOperationContextAttributesRegistrator";
+ }
+
+ @Override public void start(PluginContext ctx) {
+
((IgniteEx)ctx.grid()).context().operationContextDispatcher().registerDistributedAttribute((byte)0,
dAttr1);
+
+
((IgniteEx)ctx.grid()).context().operationContextDispatcher().registerDistributedAttribute(
+ (byte)(OperationContextDispatcher.MAX_DISTRIBUTED_ATTR_CNT
- 1),
+ dAttr2
+ );
+ }
+ };
// Local attribute 1.
OperationContextAttribute.newInstance(1000);
- // Distributed attribute 1.
- OperationContextAttribute<InetSocketAddressMessage> dAttr1 =
DistributedOperationContextManager.instance()
- .createDistributedAttribute(attrId1, dfltDistAttr1Val);
+ startGrids(2);
+ startClientGrid(2);
+
+ assertThrows(
+ null,
+ () ->
grid(0).context().operationContextDispatcher().registerDistributedAttribute((byte)1,
null),
+ IgniteException.class,
+ "Initialization of distributed operation context attributes has
already finished"
+ );
// Local attribute 2.
OperationContextAttribute.newInstance("locaAttr2");
- // Distributed attribute 2.
- OperationContextAttribute<GridCacheVersion> dAttr2 =
DistributedOperationContextManager.instance()
- .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+ InetSocketAddressMessage valToSend1 = new
InetSocketAddressMessage(dAttr1.initialValue().address(), 443);
+ GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2);
- startGrids(2);
- startClientGrid(2);
+ if (discovery)
+
doTestOperationContextAttributesPropagationThroughDiscovery(dAttr1, valToSend1,
dAttr2, valToSend2);
+ else
+
doTestOperationContextAttributesPropagationThroughCommunication(dAttr1,
valToSend1, dAttr2, valToSend2);
+ }
+ /** */
+ private void doTestOperationContextAttributesPropagationThroughDiscovery(
+ OperationContextAttribute<InetSocketAddressMessage> dAttr1,
+ InetSocketAddressMessage valToSend1,
+ OperationContextAttribute<GridCacheVersion> dAttr2,
+ GridCacheVersion valToSend2
+ ) throws Exception {
CountDownLatch coordLatch = new CountDownLatch(3);
Review Comment:
We can simplify this logic a bit -
Replace latches with ` Set<Ignite> checkedNodes =
ConcurrentHashMap.newKeySet();`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]