petrov-mg commented on code in PR #13246:
URL: https://github.com/apache/ignite/pull/13246#discussion_r3472146967
##########
modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java:
##########
@@ -911,6 +921,113 @@ else if (grid(i0).localNode().order() == 1)
assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
}
+ /** */
+ @Test
+ public void testSendAttributesByCommunication() throws Exception {
+ byte attrId1 = 0;
+ byte attrId2 =
DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1;
+
+ InetSocketAddressMessage dfltDistrAttr1Val = new
InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80);
+ GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1);
+
+ // Local attribute 1.
+ OperationContextAttribute.newInstance(1000);
+
+ // Distributed attribute 1.
+ OperationContextAttribute<InetSocketAddressMessage> dAttr0 =
DistributedOperationContextManager.instance()
+ .createDistributedAttribute(attrId1, dfltDistrAttr1Val);
+
+ // Local attribute 2.
+ OperationContextAttribute.newInstance("locaAttr2");
+
+ // Distributed attribute 2.
+ OperationContextAttribute<GridCacheVersion> dAttr1 =
DistributedOperationContextManager.instance()
+ .createDistributedAttribute(attrId2, dfltDistrAttr2Val);
+
+ startGrids(2);
+ startClientGrid(2);
+
+ InetSocketAddressMessage valToSend0 = new
InetSocketAddressMessage(dfltDistrAttr1Val.address(), 443);
+ GridCacheVersion valToSend1 = new GridCacheVersion(2, 2, 2);
+
+ // Coordinator -> Server, Coordinator -> Client, Server -> Client,
Client -> Server, etc.
+ for (int fromIdx = 0; fromIdx < 3; ++fromIdx) {
+ for (int toIdx = 0; toIdx < 3; ++toIdx) {
+ if (fromIdx == toIdx)
+ continue;
+
+ // One value.
+ try (Scope ignored = OperationContext.set(dAttr0, valToSend0))
{
+ checkOperationContextCommunicationTransmission(fromIdx,
toIdx, dAttr0, null);
+ }
+
+ // A couple of values.
+ try (Scope ignored = OperationContext.set(dAttr0, valToSend0,
dAttr1, valToSend1)) {
+ checkOperationContextCommunicationTransmission(fromIdx,
toIdx, dAttr0, dAttr1);
+ }
+ }
+ }
+ }
+
+ /** */
+ private void checkOperationContextCommunicationTransmission(
+ int gridFromIdx,
+ int gridToIdx,
+ OperationContextAttribute<InetSocketAddressMessage> attr0,
+ @Nullable OperationContextAttribute<GridCacheVersion> attr1
+ ) throws InterruptedException {
+ Ignite from = grid(gridFromIdx);
+ Ignite to = grid(gridToIdx);
+
+ CountDownLatch rcvLatch = new CountDownLatch(2 + (attr1 != null ? 2 :
0));
+
+ InetSocketAddressMessage expVal0 = OperationContext.get(attr0);
+ GridCacheVersion expVal1 = attr1 == null ? null :
OperationContext.get(attr1);
+
+ GridMessageListener lsnr = new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc)
{
+ if (msg instanceof IgniteIoTestMessage) {
Review Comment:
Just using a `msg instanceof IgniteIoTestMessage` check can lead to
unexpected errors or test flakyness, because `GridIoManager` automatically
sends a response to any `IgniteIoTestMessage` (see
[GridIoManager:495](https://github.com/apache/ignite/blob/18c177b264b972b64d6a34ffe685e7981d534180/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java#L495)),
which might be received while another check is being in progress.
We can check the `IgniteIoTestMessage#request` flag or wait for the
completion of the future returned by the `sendIoTest` method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]