This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch ignite-2.9-revert-12568 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit ed52559eb95c913e4b6ebc1b334f60c27ddbac26 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> AuthorDate: Mon Aug 31 19:24:24 2020 +0300 Revert "IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type" This reverts commit 65c30ec6 --- .../managers/communication/GridIoManager.java | 4 +- .../communication/GridIoMessageFactory.java | 1127 ++++++++++++++++---- .../communication/IgniteMessageFactoryImpl.java | 166 --- .../communication/IgniteMessageFactory.java | 39 - .../extensions/communication/MessageFactory.java | 3 - .../communication/MessageFactoryProvider.java | 46 - .../tcp/TcpCommunicationMetricsListener.java | 16 +- .../org.apache.ignite.plugin.PluginProvider | 1 - .../GridManagerLocalMessageListenerSelfTest.java | 14 +- .../GridCommunicationSendMessageSelfTest.java | 19 +- .../IgniteMessageFactoryImplTest.java | 198 ---- .../MessageDirectTypeIdConflictTest.java | 210 ---- .../GridCacheConditionalDeploymentSelfTest.java | 22 +- ...niteCacheContinuousQueryImmutableEntryTest.java | 8 +- .../GridAbstractCommunicationSelfTest.java | 17 +- .../communication/GridCacheMessageSelfTest.java | 61 +- .../tcp/GridTcpCommunicationSpiAbstractTest.java | 16 +- ...pCommunicationSpiConcurrentConnectSelfTest.java | 21 +- ...idTcpCommunicationSpiMultithreadedSelfTest.java | 9 +- ...GridTcpCommunicationSpiRecoveryAckSelfTest.java | 17 +- .../GridTcpCommunicationSpiRecoverySelfTest.java | 13 +- ...TcpCommunicationRecoveryAckClosureSelfTest.java | 19 +- .../tcp/TcpCommunicationStatisticsTest.java | 35 +- .../ignite/testframework/GridSpiTestContext.java | 3 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 5 - .../ignite/util/GridMessageCollectionTest.java | 5 +- .../h2/twostep/msg/GridH2ValueMessageFactory.java | 129 ++- 27 files changed, 1220 insertions(+), 1003 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 552613f..28c5881 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -517,8 +517,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa List<MessageFactory> compMsgs = new ArrayList<>(); - compMsgs.add(new GridIoMessageFactory()); - for (IgniteComponentType compType : IgniteComponentType.values()) { MessageFactory f = compType.messageFactory(); @@ -529,7 +527,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (!compMsgs.isEmpty()) msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()])); - msgFactory = new IgniteMessageFactoryImpl(msgs); + msgFactory = new GridIoMessageFactory(msgs); if (log.isDebugEnabled()) log.debug(startInfo()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 84a84cd..69685ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.managers.communication; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridJobCancelRequest; import org.apache.ignite.internal.GridJobExecuteRequest; import org.apache.ignite.internal.GridJobExecuteResponse; @@ -179,9 +182,9 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridMessageCollection; import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; -import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory; +import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage; @@ -194,198 +197,940 @@ import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMess /** * Message factory implementation. */ -public class GridIoMessageFactory implements MessageFactoryProvider { - /** {@inheritDoc} */ - @Override public void registerAll(IgniteMessageFactory factory) { - // -54 is reserved for SQL. - // -46 ... -51 - snapshot messages. - factory.register((short)-61, IgniteDiagnosticMessage::new); - factory.register((short)-53, SchemaOperationStatusMessage::new); - factory.register((short)-52, GridIntList::new); - factory.register((short)-51, NearCacheUpdates::new); - factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new); - factory.register((short)-49, UpdateErrors::new); - factory.register((short)-48, GridDhtAtomicNearResponse::new); - factory.register((short)-45, GridChangeGlobalStateMessageResponse::new); - factory.register((short)-44, HandshakeMessage2::new); - factory.register((short)-43, IgniteIoTestMessage::new); - factory.register((short)-36, GridDhtAtomicSingleUpdateRequest::new); - factory.register((short)-27, GridDhtTxOnePhaseCommitAckRequest::new); - factory.register((short)-26, TxLockList::new); - factory.register((short)-25, TxLock::new); - factory.register((short)-24, TxLocksRequest::new); - factory.register((short)-23, TxLocksResponse::new); - factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE, NodeIdMessage::new); - factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE, RecoveryLastReceivedMessage::new); - factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, HandshakeMessage::new); - factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, HandshakeWaitMessage::new); - factory.register((short)0, GridJobCancelRequest::new); - factory.register((short)1, GridJobExecuteRequest::new); - factory.register((short)2, GridJobExecuteResponse::new); - factory.register((short)3, GridJobSiblingsRequest::new); - factory.register((short)4, GridJobSiblingsResponse::new); - factory.register((short)5, GridTaskCancelRequest::new); - factory.register((short)6, GridTaskSessionRequest::new); - factory.register((short)7, GridCheckpointRequest::new); - factory.register((short)8, GridIoMessage::new); - factory.register((short)9, GridIoUserMessage::new); - factory.register((short)10, GridDeploymentInfoBean::new); - factory.register((short)11, GridDeploymentRequest::new); - factory.register((short)12, GridDeploymentResponse::new); - factory.register((short)13, GridEventStorageMessage::new); - factory.register((short)16, GridCacheTxRecoveryRequest::new); - factory.register((short)17, GridCacheTxRecoveryResponse::new); - factory.register((short)20, GridCacheTtlUpdateRequest::new); - factory.register((short)21, GridDistributedLockRequest::new); - factory.register((short)22, GridDistributedLockResponse::new); - factory.register((short)23, GridDistributedTxFinishRequest::new); - factory.register((short)24, GridDistributedTxFinishResponse::new); - factory.register((short)25, GridDistributedTxPrepareRequest::new); - factory.register((short)26, GridDistributedTxPrepareResponse::new); - factory.register((short)27, GridDistributedUnlockRequest::new); - factory.register((short)28, GridDhtAffinityAssignmentRequest::new); - factory.register((short)29, GridDhtAffinityAssignmentResponse::new); - factory.register((short)30, GridDhtLockRequest::new); - factory.register((short)31, GridDhtLockResponse::new); - factory.register((short)32, GridDhtTxFinishRequest::new); - factory.register((short)33, GridDhtTxFinishResponse::new); - factory.register((short)34, GridDhtTxPrepareRequest::new); - factory.register((short)35, GridDhtTxPrepareResponse::new); - factory.register((short)36, GridDhtUnlockRequest::new); - factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new); - factory.register((short)38, GridDhtAtomicUpdateRequest::new); - factory.register((short)39, GridDhtAtomicUpdateResponse::new); - factory.register((short)40, GridNearAtomicFullUpdateRequest::new); - factory.register((short)41, GridNearAtomicUpdateResponse::new); - factory.register((short)42, GridDhtForceKeysRequest::new); - factory.register((short)43, GridDhtForceKeysResponse::new); - factory.register((short)44, GridDhtPartitionDemandLegacyMessage::new); - factory.register((short)45, GridDhtPartitionDemandMessage::new); - factory.register((short)46, GridDhtPartitionsFullMessage::new); - factory.register((short)47, GridDhtPartitionsSingleMessage::new); - factory.register((short)48, GridDhtPartitionsSingleRequest::new); - factory.register((short)49, GridNearGetRequest::new); - factory.register((short)50, GridNearGetResponse::new); - factory.register((short)51, GridNearLockRequest::new); - factory.register((short)52, GridNearLockResponse::new); - factory.register((short)53, GridNearTxFinishRequest::new); - factory.register((short)54, GridNearTxFinishResponse::new); - factory.register((short)55, GridNearTxPrepareRequest::new); - factory.register((short)56, GridNearTxPrepareResponse::new); - factory.register((short)57, GridNearUnlockRequest::new); - factory.register((short)58, GridCacheQueryRequest::new); - factory.register((short)59, GridCacheQueryResponse::new); - factory.register((short)61, GridContinuousMessage::new); - factory.register((short)62, DataStreamerRequest::new); - factory.register((short)63, DataStreamerResponse::new); - factory.register((short)76, GridTaskResultRequest::new); - factory.register((short)77, GridTaskResultResponse::new); - factory.register((short)78, MissingMappingRequestMessage::new); - factory.register((short)79, MissingMappingResponseMessage::new); - factory.register((short)80, MetadataRequestMessage::new); - factory.register((short)81, MetadataResponseMessage::new); - factory.register((short)82, JobStealingRequest::new); - factory.register((short)84, GridByteArrayList::new); - factory.register((short)85, GridLongList::new); - factory.register((short)86, GridCacheVersion::new); - factory.register((short)87, GridDhtPartitionExchangeId::new); - factory.register((short)88, GridCacheReturn::new); - factory.register((short)89, CacheObjectImpl::new); - factory.register((short)90, KeyCacheObjectImpl::new); - factory.register((short)91, GridCacheEntryInfo::new); - factory.register((short)92, CacheEntryInfoCollection::new); - factory.register((short)93, CacheInvokeDirectResult::new); - factory.register((short)94, IgniteTxKey::new); - factory.register((short)95, DataStreamerEntry::new); - factory.register((short)96, CacheContinuousQueryEntry::new); - factory.register((short)97, CacheEvictionEntry::new); - factory.register((short)98, CacheEntryPredicateContainsValue::new); - factory.register((short)99, CacheEntrySerializablePredicate::new); - factory.register((short)100, IgniteTxEntry::new); - factory.register((short)101, TxEntryValueHolder::new); - factory.register((short)102, CacheVersionedValue::new); - factory.register((short)103, GridCacheRawVersionedEntry::new); - factory.register((short)104, GridCacheVersionEx::new); - factory.register((short)105, CacheObjectByteArrayImpl::new); - factory.register((short)106, GridQueryCancelRequest::new); - factory.register((short)107, GridQueryFailResponse::new); - factory.register((short)108, GridQueryNextPageRequest::new); - factory.register((short)109, GridQueryNextPageResponse::new); - factory.register((short)111, AffinityTopologyVersion::new); - factory.register((short)112, GridCacheSqlQuery::new); - factory.register((short)113, BinaryObjectImpl::new); - factory.register((short)114, GridDhtPartitionSupplyMessage::new); - factory.register((short)115, UUIDCollectionMessage::new); - factory.register((short)116, GridNearSingleGetRequest::new); - factory.register((short)117, GridNearSingleGetResponse::new); - factory.register((short)118, CacheContinuousQueryBatchAck::new); - factory.register((short)119, BinaryEnumObjectImpl::new); - - // [120..123] - DR - factory.register((short)124, GridMessageCollection::new); - factory.register((short)125, GridNearAtomicSingleUpdateRequest::new); - factory.register((short)126, GridNearAtomicSingleUpdateInvokeRequest::new); - factory.register((short)127, GridNearAtomicSingleUpdateFilterRequest::new); - factory.register((short)128, CacheGroupAffinityMessage::new); - factory.register((short)129, WalStateAckMessage::new); - factory.register((short)130, UserManagementOperationFinishedMessage::new); - factory.register((short)131, UserAuthenticateRequestMessage::new); - factory.register((short)132, UserAuthenticateResponseMessage::new); - factory.register((short)133, ClusterMetricsUpdateMessage::new); - factory.register((short)134, ContinuousRoutineStartResultMessage::new); - factory.register((short)135, LatchAckMessage::new); - factory.register((short)136, MvccTxSnapshotRequest::new); - factory.register((short)137, MvccAckRequestTx::new); - factory.register((short)138, MvccFutureResponse::new); - factory.register((short)139, MvccQuerySnapshotRequest::new); - factory.register((short)140, MvccAckRequestQueryCntr::new); - factory.register((short)141, MvccSnapshotResponse::new); - factory.register((short)143, GridCacheMvccEntryInfo::new); - factory.register((short)144, GridDhtTxQueryEnlistResponse::new); - factory.register((short)145, MvccAckRequestQueryId::new); - factory.register((short)146, MvccAckRequestTxAndQueryCntr::new); - factory.register((short)147, MvccAckRequestTxAndQueryId::new); - factory.register((short)148, MvccVersionImpl::new); - factory.register((short)149, MvccActiveQueriesMessage::new); - factory.register((short)150, MvccSnapshotWithoutTxs::new); - factory.register((short)151, GridNearTxQueryEnlistRequest::new); - factory.register((short)152, GridNearTxQueryEnlistResponse::new); - factory.register((short)153, GridNearTxQueryResultsEnlistRequest::new); - factory.register((short)154, GridNearTxQueryResultsEnlistResponse::new); - factory.register((short)155, GridDhtTxQueryEnlistRequest::new); - factory.register((short)156, GridDhtTxQueryFirstEnlistRequest::new); - factory.register((short)157, PartitionUpdateCountersMessage::new); - factory.register((short)158, GridDhtPartitionSupplyMessageV2::new); - factory.register((short)159, GridNearTxEnlistRequest::new); - factory.register((short)160, GridNearTxEnlistResponse::new); - factory.register((short)161, GridInvokeValue::new); - factory.register((short)162, GenerateEncryptionKeyRequest::new); - factory.register((short)163, GenerateEncryptionKeyResponse::new); - factory.register((short)164, MvccRecoveryFinishedMessage::new); - factory.register((short)165, PartitionCountersNeighborcastRequest::new); - factory.register((short)166, PartitionCountersNeighborcastResponse::new); - factory.register((short)167, ServiceDeploymentProcessId::new); - factory.register((short)168, ServiceSingleNodeDeploymentResultBatch::new); - factory.register((short)169, ServiceSingleNodeDeploymentResult::new); - factory.register((short)170, DeadlockProbe::new); - factory.register((short)171, ProbedTx::new); - factory.register(GridQueryKillRequest.TYPE_CODE, GridQueryKillRequest::new); - factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new); - factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new); - factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new); - factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new); - factory.register((short)177, TcpInverseConnectionResponseMessage::new); - - // [-3..119] [124..129] [-23..-28] [-36..-55] - this - // [120..123] - DR - // [-4..-22, -30..-35] - SQL - // [2048..2053] - Snapshots - // [-42..-37] - former hadoop. - // [64..71] - former IGFS. +public class GridIoMessageFactory implements MessageFactory { + /** Custom messages registry. Used for test purposes. */ + private static final Map<Short, IgniteOutClosure<Message>> CUSTOM = new ConcurrentHashMap<>(); + + /** Extensions. */ + private final MessageFactory[] ext; + + /** + * @param ext Extensions. + */ + public GridIoMessageFactory(MessageFactory[] ext) { + this.ext = ext; } /** {@inheritDoc} */ @Override public Message create(short type) { - throw new UnsupportedOperationException(); + Message msg = null; + + switch (type) { + // -54 is reserved for SQL. + // -46 ... -51 - snapshot messages. + case -61: + msg = new IgniteDiagnosticMessage(); + + break; + + case -53: + msg = new SchemaOperationStatusMessage(); + + break; + + case -52: + msg = new GridIntList(); + + break; + + case -51: + msg = new NearCacheUpdates(); + + break; + + case -50: + msg = new GridNearAtomicCheckUpdateRequest(); + + break; + + case -49: + msg = new UpdateErrors(); + + break; + + case -48: + msg = new GridDhtAtomicNearResponse(); + + break; + + case -45: + msg = new GridChangeGlobalStateMessageResponse(); + + break; + + case -44: + msg = new HandshakeMessage2(); + + break; + + case -43: + msg = new IgniteIoTestMessage(); + + break; + + case -36: + msg = new GridDhtAtomicSingleUpdateRequest(); + + break; + + case -27: + msg = new GridDhtTxOnePhaseCommitAckRequest(); + + break; + + case -26: + msg = new TxLockList(); + + break; + + case -25: + msg = new TxLock(); + + break; + + case -24: + msg = new TxLocksRequest(); + + break; + + case -23: + msg = new TxLocksResponse(); + + break; + + case TcpCommunicationSpi.NODE_ID_MSG_TYPE: + msg = new NodeIdMessage(); + + break; + + case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE: + msg = new RecoveryLastReceivedMessage(); + + break; + + case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE: + msg = new HandshakeMessage(); + + break; + + case TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE: + msg = new HandshakeWaitMessage(); + + break; + + case 0: + msg = new GridJobCancelRequest(); + + break; + + case 1: + msg = new GridJobExecuteRequest(); + + break; + + case 2: + msg = new GridJobExecuteResponse(); + + break; + + case 3: + msg = new GridJobSiblingsRequest(); + + break; + + case 4: + msg = new GridJobSiblingsResponse(); + + break; + + case 5: + msg = new GridTaskCancelRequest(); + + break; + + case 6: + msg = new GridTaskSessionRequest(); + + break; + + case 7: + msg = new GridCheckpointRequest(); + + break; + + case 8: + msg = new GridIoMessage(); + + break; + + case 9: + msg = new GridIoUserMessage(); + + break; + + case 10: + msg = new GridDeploymentInfoBean(); + + break; + + case 11: + msg = new GridDeploymentRequest(); + + break; + + case 12: + msg = new GridDeploymentResponse(); + + break; + + case 13: + msg = new GridEventStorageMessage(); + + break; + + case 16: + msg = new GridCacheTxRecoveryRequest(); + + break; + + case 17: + msg = new GridCacheTxRecoveryResponse(); + + break; + + case 20: + msg = new GridCacheTtlUpdateRequest(); + + break; + + case 21: + msg = new GridDistributedLockRequest(); + + break; + + case 22: + msg = new GridDistributedLockResponse(); + + break; + + case 23: + msg = new GridDistributedTxFinishRequest(); + + break; + + case 24: + msg = new GridDistributedTxFinishResponse(); + + break; + + case 25: + msg = new GridDistributedTxPrepareRequest(); + + break; + + case 26: + msg = new GridDistributedTxPrepareResponse(); + + break; + + case 27: + msg = new GridDistributedUnlockRequest(); + + break; + + case 28: + msg = new GridDhtAffinityAssignmentRequest(); + + break; + + case 29: + msg = new GridDhtAffinityAssignmentResponse(); + + break; + + case 30: + msg = new GridDhtLockRequest(); + + break; + + case 31: + msg = new GridDhtLockResponse(); + + break; + + case 32: + msg = new GridDhtTxFinishRequest(); + + break; + + case 33: + msg = new GridDhtTxFinishResponse(); + + break; + + case 34: + msg = new GridDhtTxPrepareRequest(); + + break; + + case 35: + msg = new GridDhtTxPrepareResponse(); + + break; + + case 36: + msg = new GridDhtUnlockRequest(); + + break; + + case 37: + msg = new GridDhtAtomicDeferredUpdateResponse(); + + break; + + case 38: + msg = new GridDhtAtomicUpdateRequest(); + + break; + + case 39: + msg = new GridDhtAtomicUpdateResponse(); + + break; + + case 40: + msg = new GridNearAtomicFullUpdateRequest(); + + break; + + case 41: + msg = new GridNearAtomicUpdateResponse(); + + break; + + case 42: + msg = new GridDhtForceKeysRequest(); + + break; + + case 43: + msg = new GridDhtForceKeysResponse(); + + break; + + case 44: + msg = new GridDhtPartitionDemandLegacyMessage(); + + break; + + case 45: + msg = new GridDhtPartitionDemandMessage(); + + break; + + case 46: + msg = new GridDhtPartitionsFullMessage(); + + break; + + case 47: + msg = new GridDhtPartitionsSingleMessage(); + + break; + + case 48: + msg = new GridDhtPartitionsSingleRequest(); + + break; + + case 49: + msg = new GridNearGetRequest(); + + break; + + case 50: + msg = new GridNearGetResponse(); + + break; + + case 51: + msg = new GridNearLockRequest(); + + break; + + case 52: + msg = new GridNearLockResponse(); + + break; + + case 53: + msg = new GridNearTxFinishRequest(); + + break; + + case 54: + msg = new GridNearTxFinishResponse(); + + break; + + case 55: + msg = new GridNearTxPrepareRequest(); + + break; + + case 56: + msg = new GridNearTxPrepareResponse(); + + break; + + case 57: + msg = new GridNearUnlockRequest(); + + break; + + case 58: + msg = new GridCacheQueryRequest(); + + break; + + case 59: + msg = new GridCacheQueryResponse(); + + break; + + case 61: + msg = new GridContinuousMessage(); + + break; + + case 62: + msg = new DataStreamerRequest(); + + break; + + case 63: + msg = new DataStreamerResponse(); + + break; + + case 76: + msg = new GridTaskResultRequest(); + + break; + + case 77: + msg = new GridTaskResultResponse(); + + break; + + case 78: + msg = new MissingMappingRequestMessage(); + + break; + + case 79: + msg = new MissingMappingResponseMessage(); + + break; + + case 80: + msg = new MetadataRequestMessage(); + + break; + + case 81: + msg = new MetadataResponseMessage(); + + break; + + case 82: + msg = new JobStealingRequest(); + + break; + + case 84: + msg = new GridByteArrayList(); + + break; + + case 85: + msg = new GridLongList(); + + break; + + case 86: + msg = new GridCacheVersion(); + + break; + + case 87: + msg = new GridDhtPartitionExchangeId(); + + break; + + case 88: + msg = new GridCacheReturn(); + + break; + + case 89: + msg = new CacheObjectImpl(); + + break; + + case 90: + msg = new KeyCacheObjectImpl(); + + break; + + case 91: + msg = new GridCacheEntryInfo(); + + break; + + case 92: + msg = new CacheEntryInfoCollection(); + + break; + + case 93: + msg = new CacheInvokeDirectResult(); + + break; + + case 94: + msg = new IgniteTxKey(); + + break; + + case 95: + msg = new DataStreamerEntry(); + + break; + + case 96: + msg = new CacheContinuousQueryEntry(); + + break; + + case 97: + msg = new CacheEvictionEntry(); + + break; + + case 98: + msg = new CacheEntryPredicateContainsValue(); + + break; + + case 99: + msg = new CacheEntrySerializablePredicate(); + + break; + + case 100: + msg = new IgniteTxEntry(); + + break; + + case 101: + msg = new TxEntryValueHolder(); + + break; + + case 102: + msg = new CacheVersionedValue(); + + break; + + case 103: + msg = new GridCacheRawVersionedEntry<>(); + + break; + + case 104: + msg = new GridCacheVersionEx(); + + break; + + case 105: + msg = new CacheObjectByteArrayImpl(); + + break; + + case 106: + msg = new GridQueryCancelRequest(); + + break; + + case 107: + msg = new GridQueryFailResponse(); + + break; + + case 108: + msg = new GridQueryNextPageRequest(); + + break; + + case 109: + msg = new GridQueryNextPageResponse(); + + break; + + case 110: + // EMPTY type + // GridQueryRequest was removed + break; + + case 111: + msg = new AffinityTopologyVersion(); + + break; + + case 112: + msg = new GridCacheSqlQuery(); + + break; + + case 113: + msg = new BinaryObjectImpl(); + + break; + + case 114: + msg = new GridDhtPartitionSupplyMessage(); + + break; + + case 115: + msg = new UUIDCollectionMessage(); + + break; + + case 116: + msg = new GridNearSingleGetRequest(); + + break; + + case 117: + msg = new GridNearSingleGetResponse(); + + break; + + case 118: + msg = new CacheContinuousQueryBatchAck(); + + break; + + case 119: + msg = new BinaryEnumObjectImpl(); + + break; + + // [120..123] - DR + case 124: + msg = new GridMessageCollection<>(); + + break; + + case 125: + msg = new GridNearAtomicSingleUpdateRequest(); + + break; + + case 126: + msg = new GridNearAtomicSingleUpdateInvokeRequest(); + + break; + + case 127: + msg = new GridNearAtomicSingleUpdateFilterRequest(); + + break; + + case 128: + msg = new CacheGroupAffinityMessage(); + + break; + + case 129: + msg = new WalStateAckMessage(); + + break; + + case 130: + msg = new UserManagementOperationFinishedMessage(); + + break; + + case 131: + msg = new UserAuthenticateRequestMessage(); + + break; + + case 132: + msg = new UserAuthenticateResponseMessage(); + + break; + + case 133: + msg = new ClusterMetricsUpdateMessage(); + + break; + + case 134: + msg = new ContinuousRoutineStartResultMessage(); + + break; + + case 135: + msg = new LatchAckMessage(); + + break; + + case 136: + msg = new MvccTxSnapshotRequest(); + + break; + + case 137: + msg = new MvccAckRequestTx(); + + break; + + case 138: + msg = new MvccFutureResponse(); + + break; + + case 139: + msg = new MvccQuerySnapshotRequest(); + + break; + + case 140: + msg = new MvccAckRequestQueryCntr(); + + break; + + case 141: + msg = new MvccSnapshotResponse(); + + break; + + case 143: + msg = new GridCacheMvccEntryInfo(); + + break; + + case 144: + msg = new GridDhtTxQueryEnlistResponse(); + + break; + + case 145: + msg = new MvccAckRequestQueryId(); + + break; + + case 146: + msg = new MvccAckRequestTxAndQueryCntr(); + + break; + + case 147: + msg = new MvccAckRequestTxAndQueryId(); + + break; + + case 148: + msg = new MvccVersionImpl(); + + break; + + case 149: + msg = new MvccActiveQueriesMessage(); + + break; + + case 150: + msg = new MvccSnapshotWithoutTxs(); + + break; + + case 151: + msg = new GridNearTxQueryEnlistRequest(); + + break; + + case 152: + msg = new GridNearTxQueryEnlistResponse(); + + break; + + case 153: + msg = new GridNearTxQueryResultsEnlistRequest(); + + break; + + case 154: + msg = new GridNearTxQueryResultsEnlistResponse(); + + break; + + case 155: + msg = new GridDhtTxQueryEnlistRequest(); + + break; + + case 156: + msg = new GridDhtTxQueryFirstEnlistRequest(); + + break; + + case 157: + msg = new PartitionUpdateCountersMessage(); + + break; + + case 158: + msg = new GridDhtPartitionSupplyMessageV2(); + + break; + + case 159: + msg = new GridNearTxEnlistRequest(); + + break; + + case 160: + msg = new GridNearTxEnlistResponse(); + + break; + + case 161: + msg = new GridInvokeValue(); + + break; + + case 162: + msg = new GenerateEncryptionKeyRequest(); + + break; + + case 163: + msg = new GenerateEncryptionKeyResponse(); + + break; + + case 164: + msg = new MvccRecoveryFinishedMessage(); + + break; + + case 165: + msg = new PartitionCountersNeighborcastRequest(); + + break; + + case 166: + msg = new PartitionCountersNeighborcastResponse(); + + break; + + case 167: + msg = new ServiceDeploymentProcessId(); + + break; + + case 168: + msg = new ServiceSingleNodeDeploymentResultBatch(); + + break; + + case 169: + msg = new ServiceSingleNodeDeploymentResult(); + + break; + + case 170: + msg = new DeadlockProbe(); + + break; + + case 171: + msg = new ProbedTx(); + + break; + + case GridQueryKillRequest.TYPE_CODE: + msg = new GridQueryKillRequest(); + + break; + + case GridQueryKillResponse.TYPE_CODE: + msg = new GridQueryKillResponse(); + + break; + + case GridIoSecurityAwareMessage.TYPE_CODE: + msg = new GridIoSecurityAwareMessage(); + + break; + + case SessionChannelMessage.TYPE_CODE: + msg = new SessionChannelMessage(); + + break; + + case SingleNodeMessage.TYPE_CODE: + msg = new SingleNodeMessage<>(); + + break; + + case 177: + msg = new TcpInverseConnectionResponseMessage(); + + break; + + // [-3..119] [124..129] [-23..-28] [-36..-55] - this + // [120..123] - DR + // [-4..-22, -30..-35] - SQL + // [2048..2053] - Snapshots + default: + if (ext != null) { + for (MessageFactory factory : ext) { + msg = factory.create(type); + + if (msg != null) + break; + } + } + + if (msg == null) { + IgniteOutClosure<Message> c = CUSTOM.get(type); + + if (c != null) + msg = c.apply(); + } + } + + if (msg == null) + throw new IgniteException("Invalid message type: " + type); + + return msg; + } + + /** + * Registers factory for custom message. Used for test purposes. + * + * @param type Message type. + * @param c Message producer. + */ + public static void registerCustom(short type, IgniteOutClosure<Message> c) { + assert c != null; + + CUSTOM.put(type, c); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java deleted file mode 100644 index eb89043..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Supplier; - -import org.apache.ignite.IgniteException; -import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; -import org.jetbrains.annotations.Nullable; -import org.jetbrains.annotations.TestOnly; - -/** - * Message factory implementation which is responsible for instantiation of all communication messages. - */ -public class IgniteMessageFactoryImpl implements IgniteMessageFactory { - /** Offset. */ - private static final int OFF = -Short.MIN_VALUE; - - /** Array size. */ - private static final int ARR_SIZE = 1 << Short.SIZE; - - /** Custom messages registry. Used for test purposes. */ - private static final Map<Short, Supplier<Message>> CUSTOM = new ConcurrentHashMap<>(); - - /** Message suppliers. */ - private final Supplier<Message>[] msgSuppliers = (Supplier<Message>[]) Array.newInstance(Supplier.class, ARR_SIZE); - - /** Initialized flag. If {@code true} then new message type couldn't be registered. */ - private boolean initialized; - - /** - * Contructor. - * - * @param factories Concrete message factories or message factory providers. Cfn't be empty or {@code null}. - */ - public IgniteMessageFactoryImpl(MessageFactory[] factories) { - if (factories == null || factories.length == 0) - throw new IllegalArgumentException("Message factory couldn't be initialized. Factories aren't provided."); - - List<MessageFactory> old = new ArrayList<>(factories.length); - - for (MessageFactory factory : factories) { - if (factory instanceof MessageFactoryProvider) { - MessageFactoryProvider p = (MessageFactoryProvider)factory; - - p.registerAll(this); - } - else - old.add(factory); - } - - if (!old.isEmpty()) { - for (int i = 0; i < ARR_SIZE; i++) { - Supplier<Message> curr = msgSuppliers[i]; - - if (curr == null) { - short directType = indexToDirectType(i); - - for (MessageFactory factory : old) { - Message msg = factory.create(directType); - - if (msg != null) - register(directType, () -> factory.create(directType)); - } - } - } - } - - initialized = true; - } - - /** {@inheritDoc} */ - @Override public void register(short directType, Supplier<Message> supplier) throws IgniteException { - if (initialized) { - throw new IllegalStateException("Message factory is already initialized. " + - "Registration of new message types is forbidden."); - } - - int idx = directTypeToIndex(directType); - - Supplier<Message> curr = msgSuppliers[idx]; - - if (curr == null) - msgSuppliers[idx] = supplier; - else - throw new IgniteException("Message factory is already registered for direct type: " + directType); - } - - /** - * Creates new message instance of provided direct type. - * <p> - * - * @param directType Message direct type. - * @return Message instance. - * @throws IgniteException If there are no any message factory for given {@code directType}. - */ - @Override public @Nullable Message create(short directType) { - Supplier<Message> supplier = msgSuppliers[directTypeToIndex(directType)]; - - if (supplier == null) - supplier = CUSTOM.get(directType); - - if (supplier == null) - throw new IgniteException("Invalid message type: " + directType); - - return supplier.get(); - } - - /** - * @param directType Direct type. - */ - private static int directTypeToIndex(short directType) { - return directType + OFF; - } - - /** - * @param idx Index. - */ - private static short indexToDirectType(int idx) { - int res = idx - OFF; - - assert res >= Short.MIN_VALUE && res <= Short.MAX_VALUE; - - return (short)res; - } - - /** - * Registers factory for custom message. Used for test purposes. - * - * @param type Message type. - * @param c Message producer. - * - * @deprecated Should be removed. Please don't use this method anymore. - * Consider using of plugin with own message types. - */ - @TestOnly - @Deprecated - public static void registerCustom(short type, Supplier<Message> c) { - assert c != null; - - CUSTOM.put(type, c); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java deleted file mode 100644 index ae159b3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.plugin.extensions.communication; - -import java.util.function.Supplier; -import org.apache.ignite.IgniteException; - -/** - * Message factory for all communication messages registered using {@link #register(short, Supplier)} method call. - */ -public interface IgniteMessageFactory extends MessageFactory { - /** - * Register message factory with given direct type. All messages must be registered during construction - * of class which implements this interface. Any invocation of this method after initialization is done must - * throw {@link IllegalStateException} exception. - * - * @param directType Direct type. - * @param supplier Message factory. - * @throws IgniteException In case of attempt to register message with direct type which is already registered. - * @throws IllegalStateException On any invocation of this method when class which implements this interface - * is alredy constructed. - */ - public void register(short directType, Supplier<Message> supplier) throws IgniteException; -} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java index 07a3d5b..1ea88fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java @@ -26,10 +26,7 @@ import org.jetbrains.annotations.Nullable; * A plugin can provide his own message factory as an extension * if it uses any custom messages (all message must extend * {@link Message} class). - * - * @deprecated Use {@link MessageFactoryProvider} instead. */ -@Deprecated public interface MessageFactory extends Extension { /** * Creates new message instance of provided type. diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java deleted file mode 100644 index 910c68c..0000000 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.plugin.extensions.communication; - -import org.jetbrains.annotations.Nullable; - -/** - * Provider of communication message factories. - * <p> - * Implementation of this interface is responsible for registration of all message factories in - * {@link #registerAll} method. - * <p> - * {@link #registerAll} method's call is responsibility of {@link IgniteMessageFactory} implementation. - */ -public interface MessageFactoryProvider extends MessageFactory { - /** - * Registers all messages factories. See {@link IgniteMessageFactory#register}. - * - * @param factory {@link IgniteMessageFactory} implementation. - */ - public void registerAll(IgniteMessageFactory factory); - - /** - * Always throws {@link UnsupportedOperationException}. - * @param type Message direct type. - * @throws UnsupportedOperationException On any invocation. - */ - @Override @Nullable public default Message create(short type) { - throw new UnsupportedOperationException(); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java index afece1f..361409b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java @@ -109,7 +109,7 @@ class TcpCommunicationMetricsListener { private final Object msgTypMapMux = new Object(); /** Message type map. */ - private volatile Map<Short, String> msgTypeMap; + private volatile Map<Short, String> msgTypMap; /** */ public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) { @@ -285,7 +285,7 @@ class TcpCommunicationMetricsListener { if (metric.name().startsWith(prefix)) { short directType = Short.parseShort(metric.name().substring(prefix.length())); - Map<Short, String> msgTypMap0 = msgTypeMap; + Map<Short, String> msgTypMap0 = msgTypMap; if (msgTypMap0 != null) { String typeName = msgTypMap0.get(directType); @@ -374,24 +374,24 @@ class TcpCommunicationMetricsListener { private void updateMessageTypeMap(Message msg) { short typeId = msg.directType(); - Map<Short, String> msgTypMap0 = msgTypeMap; + Map<Short, String> msgTypMap0 = msgTypMap; if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) { synchronized (msgTypMapMux) { - if (msgTypeMap == null) { + if (msgTypMap == null) { msgTypMap0 = new HashMap<>(); msgTypMap0.put(typeId, msg.getClass().getName()); - msgTypeMap = msgTypMap0; + msgTypMap = msgTypMap0; } else { - if (!msgTypeMap.containsKey(typeId)) { - msgTypMap0 = new HashMap<>(msgTypeMap); + if (!msgTypMap.containsKey(typeId)) { + msgTypMap0 = new HashMap<>(msgTypMap); msgTypMap0.put(typeId, msg.getClass().getName()); - msgTypeMap = msgTypMap0; + msgTypMap = msgTypMap0; } } } diff --git a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider index 7704c0b..6bca88f 100644 --- a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider +++ b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider @@ -2,4 +2,3 @@ org.apache.ignite.internal.processors.cache.persistence.standbycluster.IgniteSta org.apache.ignite.internal.processors.cache.persistence.wal.memtracker.PageMemoryTrackerPluginProvider org.apache.ignite.internal.processors.configuration.distributed.TestDistibutedConfigurationPlugin org.apache.ignite.plugin.NodeValidationPluginProvider - diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java index 6dd103e..108e4672 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java @@ -22,9 +22,11 @@ import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.managers.communication.GridIoUserMessage; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiContext; @@ -46,7 +48,11 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT private static final short DIRECT_TYPE = 210; static { - IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, GridIoUserMessage::new); + GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridIoUserMessage(); + } + }); } /** {@inheritDoc} */ @@ -174,7 +180,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT private IgniteSpiContext spiCtx; /** Test message topic. **/ - private static final String TEST_TOPIC = "test_topic"; + private String TEST_TOPIC = "test_topic"; /** {@inheritDoc} */ @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { @@ -186,7 +192,6 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT // No-op. } - /** {@inheritDoc} */ @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { this.spiCtx = spiCtx; @@ -198,7 +203,6 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT } - /** {@inheritDoc} */ @Override public void onContextDestroyed0() { spiCtx.removeLocalMessageListener(TEST_TOPIC, new IgniteBiPredicate<UUID, Object>() { @Override public boolean apply(UUID uuid, Object o) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java index 0a75cf5..8a27a46 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.CountDownLatch; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -45,10 +46,20 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest /** */ private static final short DIRECT_TYPE_OVER_BYTE = 1000; - static { - IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, TestMessage::new); + /** */ + private int bufSize; - IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE_OVER_BYTE, TestOverByteIdMessage::new); + static { + GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new TestMessage(); + } + }); + GridIoMessageFactory.registerCustom(DIRECT_TYPE_OVER_BYTE, new CO<Message>() { + @Override public Message apply() { + return new TestOverByteIdMessage(); + } + }); } /** {@inheritDoc} */ @@ -97,6 +108,8 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest */ @Test public void testSendMessageWithBuffer() throws Exception { + bufSize = 8192; + try { startGridsMultiThreaded(2); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java deleted file mode 100644 index c55cc0d..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -import java.nio.ByteBuffer; - -import org.apache.ignite.IgniteException; -import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.Nullable; -import org.junit.Test; - -import static org.junit.Assert.assertTrue; - -/** - * Tests for default implementation of {@link IgniteMessageFactory} interface. - */ -public class IgniteMessageFactoryImplTest { - /** Test message 1 type. */ - private static final short TEST_MSG_1_TYPE = 1; - - /** Test message 2 type. */ - private static final short TEST_MSG_2_TYPE = 2; - - /** Unknown message type. */ - private static final short UNKNOWN_MSG_TYPE = 0; - - /** - * Tests that impossible register new message after initialization. - */ - @Test(expected = IllegalStateException.class) - public void testReadOnly() { - MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()}; - - IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories); - - msgFactory.register((short)0, () -> null); - } - - /** - * Tests that proper message type will be returned by message factory. - */ - @Test - public void testCreate() { - MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()}; - - IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories); - - Message msg; - - msg = msgFactory.create(TEST_MSG_1_TYPE); - assertTrue(msg instanceof TestMessage1); - - msg = msgFactory.create(TEST_MSG_2_TYPE); - assertTrue(msg instanceof TestMessage2); - - msg = msgFactory.create(TEST_MSG_2_TYPE); - assertTrue(msg instanceof TestMessage2); - } - - /** - * Tests that exception will be thrown for unknown message direct type. - */ - @Test(expected = IgniteException.class) - public void testCreate_UnknownMessageType() { - MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()}; - - IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories); - - msgFactory.create(UNKNOWN_MSG_TYPE); - } - - /** - * Tests attemption of registration message with already registered message type. - */ - @Test(expected = IgniteException.class) - @SuppressWarnings("ResultOfObjectAllocationIgnored") - public void testRegisterTheSameType() { - MessageFactory[] factories = { - new TestMessageFactoryPovider(), - new TestMessageFactory(), - new TestMessageFactoryPoviderWithTheSameDirectType() - }; - - new IgniteMessageFactoryImpl(factories); - } - - /** - * {@link MessageFactoryProvider} implementation. - */ - private static class TestMessageFactoryPovider implements MessageFactoryProvider { - /** {@inheritDoc} */ - @Override public void registerAll(IgniteMessageFactory factory) { - factory.register(TEST_MSG_1_TYPE, TestMessage1::new); - } - } - - /** - * {@link MessageFactoryProvider} implementation with message direct type which is already registered. - */ - private static class TestMessageFactoryPoviderWithTheSameDirectType implements MessageFactoryProvider { - /** {@inheritDoc} */ - @Override public void registerAll(IgniteMessageFactory factory) { - factory.register(TEST_MSG_1_TYPE, TestMessage1::new); - } - } - - /** - * {@link MessageFactory} implementation whish still uses creation with switch-case. - */ - private static class TestMessageFactory implements MessageFactory { - /** {@inheritDoc} */ - @Override public @Nullable Message create(short type) { - switch (type) { - case TEST_MSG_2_TYPE: - return new TestMessage2(); - - default: - return null; - } - } - } - - /** Test message. */ - private static class TestMessage1 implements Message { - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - return false; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 1; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - } - - /** Test message. */ - private static class TestMessage2 implements Message { - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - return false; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 2; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java deleted file mode 100644 index 6046f4a..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.communication; - -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.concurrent.Callable; -import java.util.UUID; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.plugin.CachePluginContext; -import org.apache.ignite.plugin.CachePluginProvider; -import org.apache.ignite.plugin.ExtensionRegistry; -import org.apache.ignite.plugin.IgnitePlugin; -import org.apache.ignite.plugin.PluginConfiguration; -import org.apache.ignite.plugin.PluginContext; -import org.apache.ignite.plugin.PluginProvider; -import org.apache.ignite.plugin.PluginValidationException; -import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; -import org.junit.Test; - -import static org.apache.ignite.testframework.GridTestUtils.assertThrows; - -/** - * Tests that node will not start if some component tries to register message factory with direct type - * for which message factory is already registered. - */ -public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest { - /** Test plugin name. */ - private static final String TEST_PLUGIN_NAME = "TEST_PLUGIN"; - - /** Message direct type. Message with this direct type will be registered by {@link GridIoMessageFactory} first. */ - private static final short MSG_DIRECT_TYPE = -44; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setPluginProviders(new TestPluginProvider()); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - stopAllGrids(); - } - - /** - * Tests that node will not start if some component tries to register message factory with direct type - * for which message factory is already registered. - */ - @Test - public void testRegisterMessageFactoryWithConflictDirectTypeId() throws Exception { - assertThrows(log, (Callable<Object>)this::startGrid, IgniteCheckedException.class, - "Message factory is already registered for direct type: " + MSG_DIRECT_TYPE); - } - - /** Plugin with own message factory. */ - private static class TestPlugin implements IgnitePlugin { - } - - /** */ - public static class TestPluginProvider implements PluginProvider<TestPluginConfiguration> { - /** {@inheritDoc} */ - @Override public String name() { - return TEST_PLUGIN_NAME; - } - - /** {@inheritDoc} */ - @Override public <T extends IgnitePlugin> T plugin() { - return (T)new TestPlugin(); - } - - /** {@inheritDoc} */ - @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) { - return null; - } - - /** {@inheritDoc} */ - @Override public String version() { - return null; - } - - /** {@inheritDoc} */ - @Override public String copyright() { - return null; - } - - /** {@inheritDoc} */ - @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) throws IgniteCheckedException { - registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() { - @Override public void registerAll(IgniteMessageFactory factory) { - factory.register(MSG_DIRECT_TYPE, TestMessage::new); - } - }); - } - - /** {@inheritDoc} */ - @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { - return null; - } - - /** {@inheritDoc} */ - @Override public void start(PluginContext ctx) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onIgniteStart() throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onIgniteStop(boolean cancel) { - // no-op. - } - - /** {@inheritDoc} */ - @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) { - return null; - } - - /** {@inheritDoc} */ - @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void validateNewNode(ClusterNode node, Serializable data) { - // No-op. - } - } - - /** */ - private static class TestPluginConfiguration implements PluginConfiguration { - } - - /** Test message with already registered direct type. */ - private static class TestMessage implements Message { - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - return false; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return MSG_DIRECT_TYPE; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - } - -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java index aaa42b1..12490d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java @@ -22,8 +22,10 @@ import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.typedef.CO; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -40,7 +42,11 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe * */ static { - IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new); + GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new TestMessage(); + } + }); } /** {@inheritDoc} */ @@ -56,8 +62,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe * @return Cache configuration. * @throws Exception In case of error. */ - protected CacheConfiguration<?, ?> cacheConfiguration() throws Exception { - CacheConfiguration<?, ?> cfg = defaultCacheConfiguration(); + protected CacheConfiguration cacheConfiguration() throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); cfg.setCacheMode(PARTITIONED); cfg.setWriteSynchronizationMode(FULL_SYNC); @@ -107,7 +113,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe */ @Test public void testAddedDeploymentInfo() throws Exception { - GridCacheContext<?, ?> ctx = cacheContext(); + GridCacheContext ctx = cacheContext(); if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller) assertFalse(ctx.deploymentEnabled()); @@ -131,7 +137,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe */ @Test public void testAddedDeploymentInfo2() throws Exception { - GridCacheContext<?, ?> ctx = cacheContext(); + GridCacheContext ctx = cacheContext(); if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller) assertFalse(ctx.deploymentEnabled()); @@ -155,8 +161,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe /** * @return Cache context. */ - protected GridCacheContext<?, ?> cacheContext() { - return ((IgniteCacheProxy<?, ?>)grid(0).cache(DEFAULT_CACHE_NAME)).context(); + protected GridCacheContext cacheContext() { + return ((IgniteCacheProxy)grid(0).cache(DEFAULT_CACHE_NAME)).context(); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index 49d588f..aeaabda 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -33,11 +33,9 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -62,7 +60,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); ccfg.setCacheMode(PARTITIONED); ccfg.setAtomicityMode(atomicityMode()); ccfg.setWriteSynchronizationMode(FULL_SYNC); @@ -155,9 +153,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst e0.writeTo(buf, writer); CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry(); - IgniteMessageFactoryImpl msgFactory = - new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()}); - e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(msgFactory, (byte)1)); + e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(new GridIoMessageFactory(null), (byte)1)); assertEquals(e0.cacheId(), e1.cacheId()); assertEquals(e0.eventType(), e1.eventType()); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 326b716..8034093 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -29,8 +29,9 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; @@ -51,7 +52,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; * Super class for all communication self tests. * @param <T> Type of communication SPI. */ -public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> { +public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> { /** */ private static long msgId = 1; @@ -74,13 +75,17 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS private static GridTimeoutProcessor timeoutProcessor; /** */ - protected boolean useSsl; + protected boolean useSsl = false; /** * */ static { - IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); } /** */ @@ -157,7 +162,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS for (ClusterNode node : nodes) { synchronized (mux) { if (!msgDestMap.containsKey(entry.getKey())) - msgDestMap.put(entry.getKey(), new HashSet<>()); + msgDestMap.put(entry.getKey(), new HashSet<UUID>()); msgDestMap.get(entry.getKey()).add(node.id()); } @@ -203,7 +208,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS for (ClusterNode node : nodes) { synchronized (mux) { if (!msgDestMap.containsKey(sndId)) - msgDestMap.put(sndId, new HashSet<>()); + msgDestMap.put(sndId, new HashSet<UUID>()); msgDestMap.get(sndId).add(node.id()); } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java index 21a846d..797328e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java @@ -31,13 +31,14 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.failure.AbstractFailureHandler; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -65,15 +66,35 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { * */ static { - IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new); + GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new TestMessage(); + } + }); - IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); - IgniteMessageFactoryImpl.registerCustom(TestMessage1.DIRECT_TYPE, TestMessage1::new); + GridIoMessageFactory.registerCustom(TestMessage1.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new TestMessage1(); + } + }); - IgniteMessageFactoryImpl.registerCustom(TestMessage2.DIRECT_TYPE, TestMessage2::new); + GridIoMessageFactory.registerCustom(TestMessage2.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new TestMessage2(); + } + }); - IgniteMessageFactoryImpl.registerCustom(TestBadMessage.DIRECT_TYPE, TestBadMessage::new); + GridIoMessageFactory.registerCustom(TestBadMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new TestBadMessage(); + } + }); } /** {@inheritDoc} */ @@ -84,7 +105,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { cfg.setFailureHandler(new TestFailureHandler()); - CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); ccfg.setCacheMode(CacheMode.PARTITIONED); ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); @@ -125,25 +146,25 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { try { startGrids(2); - IgniteEx ignite0 = grid(0); - IgniteEx ignite1 = grid(1); + Ignite ignite0 = grid(0); + Ignite ignite1 = grid(1); - ignite0.context().cache().context().io().addCacheHandler( + ((IgniteKernal)ignite0).context().cache().context().io().addCacheHandler( 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { throw new RuntimeException("Test bad message exception"); } }); - ignite1.context().cache().context().io().addCacheHandler( + ((IgniteKernal)ignite1).context().cache().context().io().addCacheHandler( 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { throw new RuntimeException("Test bad message exception"); } }); - ignite0.context().cache().context().io().send( - ignite1.localNode().id(), new TestBadMessage(), (byte)2); + ((IgniteKernal)ignite0).context().cache().context().io().send( + ((IgniteKernal)ignite1).localNode().id(), new TestBadMessage(), (byte)2); boolean res = failureLatch.await(5, TimeUnit.SECONDS); @@ -158,8 +179,8 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void doSend() throws Exception { - GridIoManager mgr0 = grid(0).context().io(); - GridIoManager mgr1 = grid(1).context().io(); + GridIoManager mgr0 = ((IgniteKernal)grid(0)).context().io(); + GridIoManager mgr1 = ((IgniteKernal)grid(1)).context().io(); String topic = "test-topic"; @@ -174,14 +195,14 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { assertEquals(10, messages.size()); - int cnt = 0; + int count = 0; for (TestMessage1 msg1 : messages) { assertTrue(msg1.body().contains(TEST_BODY)); int i = Integer.parseInt(msg1.body().substring(TEST_BODY.length() + 1)); - assertEquals(cnt, i); + assertEquals(count, i); TestMessage2 msg2 = (TestMessage2) msg1.message(); @@ -193,11 +214,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { GridTestMessage msg3 = (GridTestMessage) msg2.message(); - assertEquals(cnt, msg3.getMsgId()); + assertEquals(count, msg3.getMsgId()); assertEquals(grid(1).localNode().id(), msg3.getSourceNodeId()); - cnt++; + count++; } } catch (Exception e) { diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index 54e5386..7940a63 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -41,7 +41,7 @@ import org.junit.Test; /** * Test for {@link TcpCommunicationSpi} */ -abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi<Message>> { +abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> { /** */ private static final int SPI_COUNT = 3; @@ -59,7 +59,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica } /** {@inheritDoc} */ - @Override protected CommunicationSpi<Message> getSpi(int idx) { + @Override protected CommunicationSpi getSpi(int idx) { TcpCommunicationSpi spi = new TcpCommunicationSpi(); if (!useShmem) @@ -88,7 +88,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica super.testSendToManyNodes(); // Test idle clients remove. - for (CommunicationSpi<Message> spi : spis.values()) { + for (CommunicationSpi spi : spis.values()) { ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients"); assertEquals(getSpiCount() - 1, clients.size()); @@ -129,13 +129,13 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica final CyclicBarrier b = new CyclicBarrier(THREADS); - List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + List<IgniteInternalFuture> futs = new ArrayList<>(); for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) { final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue(); - futs.add(GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { List<ClusterNode> checkNodes = new ArrayList<>(nodes); assert checkNodes.size() > 1; @@ -156,7 +156,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica })); } - for (IgniteInternalFuture<?> f : futs) + for (IgniteInternalFuture f : futs) f.get(); } @@ -164,7 +164,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica @Override protected void afterTest() throws Exception { super.afterTest(); - for (CommunicationSpi<Message> spi : spis.values()) { + for (CommunicationSpi spi : spis.values()) { ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients"); for (int i = 0; i < 20; i++) { diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 69e4bef..7ab0d6f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -37,12 +37,13 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; @@ -64,7 +65,7 @@ import org.junit.Test; * */ @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") -public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi<Message>> +public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> { /** */ private static final int SPI_CNT = 2; @@ -100,7 +101,11 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic * */ static { - IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); } /** @@ -351,24 +356,24 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic assertTrue(latch.await(10, TimeUnit.SECONDS)); - for (CommunicationSpi<?> spi : spis) { + for (CommunicationSpi spi : spis) { ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients"); assertEquals(1, clients.size()); - final GridNioServer<?> srv = U.field(spi, "nioSrvr"); + final GridNioServer srv = U.field(spi, "nioSrvr"); final int conns = pairedConnections ? 2 : 1; GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - Collection<?> sessions = U.field(srv, "sessions"); + Collection sessions = U.field(srv, "sessions"); return sessions.size() == conns * connectionsPerNode; } }, 5000); - Collection<?> sessions = U.field(srv, "sessions"); + Collection sessions = U.field(srv, "sessions"); assertEquals(conns * connectionsPerNode, sessions.size()); } @@ -391,7 +396,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic /** * @return SPI. */ - private CommunicationSpi<Message> createSpi() { + private CommunicationSpi createSpi() { TcpCommunicationSpi spi = new TcpCommunicationSpi(); spi.setLocalAddress("127.0.0.1"); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index f99df2b..a53b43b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -38,13 +38,14 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; @@ -99,7 +100,11 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac private static boolean reject; static { - IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index d99f48f..408eb10 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -27,13 +27,14 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; @@ -55,7 +56,7 @@ import org.junit.Test; * */ @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") -public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> { +public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> { /** */ private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>(); @@ -75,7 +76,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS * */ static { - IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); } /** @@ -168,7 +173,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS final long totAcked0 = totAcked; for (TcpCommunicationSpi spi : spis) { - GridNioServer<?> srv = U.field(spi, "nioSrvr"); + GridNioServer srv = U.field(spi, "nioSrvr"); final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); @@ -276,7 +281,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS ClusterNode node0 = nodes.get(0); ClusterNode node1 = nodes.get(1); - final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr"); + final GridNioServer srv1 = U.field(spi1, "nioSrvr"); int msgId = 0; @@ -336,7 +341,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS * @throws Exception If failed. */ private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception { - final GridNioServer<?> srv = U.field(spi, "nioSrvr"); + final GridNioServer srv = U.field(spi, "nioSrvr"); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index 1d03590..5ec734a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -32,12 +32,13 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; @@ -60,7 +61,7 @@ import org.junit.Test; * */ @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") -public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> { +public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> { /** */ private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>(); @@ -89,7 +90,11 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi< * */ static { - IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); } /** @@ -669,7 +674,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi< * @throws Exception If failed. */ private GridNioSession communicationSession(TcpCommunicationSpi spi, boolean in) throws Exception { - final GridNioServer<?> srv = U.field(spi, "nioSrvr"); + final GridNioServer srv = U.field(spi, "nioSrvr"); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index ef9b413..d937bb0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; @@ -58,7 +59,7 @@ import org.junit.Test; * */ @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") -public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi<Message>> +public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> { /** */ private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>(); @@ -79,7 +80,11 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic * */ static { - IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); } /** @@ -90,7 +95,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic } /** */ - private static class TestListener implements CommunicationListener<Message> { + private class TestListener implements CommunicationListener<Message> { /** */ private GridConcurrentHashSet<Long> msgIds = new GridConcurrentHashSet<>(); @@ -189,7 +194,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic final long totAcked0 = totAcked; for (TcpCommunicationSpi spi : spis) { - GridNioServer<?> srv = U.field(spi, "nioSrvr"); + GridNioServer srv = U.field(spi, "nioSrvr"); Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); @@ -301,7 +306,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic // Check that session will not be closed by idle timeout because expected close by queue overflow. assertTrue(spi0.getIdleConnectionTimeout() > awaitTime); - final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr"); + final GridNioServer srv1 = U.field(spi1, "nioSrvr"); // For prevent session close by write timeout. srv1.writeTimeout(60_000); @@ -387,7 +392,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic * @throws Exception If failed. */ private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception { - final GridNioServer<?> srv = U.field(spi, "nioSrvr"); + final GridNioServer srv = U.field(spi, "nioSrvr"); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java index 26fb56b..99840c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java @@ -17,21 +17,28 @@ package org.apache.ignite.spi.communication.tcp; +import java.lang.management.ManagementFactory; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.processors.metric.impl.MetricUtils; +import org.apache.ignite.internal.util.typedef.CO; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; @@ -56,7 +63,11 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest { private final CountDownLatch latch = new CountDownLatch(1); static { - IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new); + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); } /** @@ -106,9 +117,19 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest { * @param nodeIdx Node index. * @return MBean instance. */ - private TcpCommunicationSpiMBean mbean(int nodeIdx) { - return getMxBean(getTestIgniteInstanceName(nodeIdx), "SPIs", - SynchronizedCommunicationSpi.class, TcpCommunicationSpiMBean.class); + private TcpCommunicationSpiMBean mbean(int nodeIdx) throws MalformedObjectNameException { + ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "SPIs", + SynchronizedCommunicationSpi.class.getSimpleName()); + + MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + + if (mbeanServer.isRegistered(mbeanName)) + return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, mbeanName, TcpCommunicationSpiMBean.class, + true); + else + fail("MBean is not registered: " + mbeanName.getCanonicalName()); + + return null; } /** @@ -138,10 +159,10 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest { latch.await(10, TimeUnit.SECONDS); - ClusterGroup clusterGrpNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id()); + ClusterGroup clusterGroupNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id()); // Send job from node0 to node1. - grid(0).compute(clusterGrpNode1).call(new IgniteCallable<Boolean>() { + grid(0).compute(clusterGroupNode1).call(new IgniteCallable<Boolean>() { @Override public Boolean call() throws Exception { return Boolean.TRUE; } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 6424bfc..da43ded 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -44,7 +44,6 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridIoUserMessage; import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -568,7 +567,7 @@ public class GridSpiTestContext implements IgniteSpiContext { /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { if (factory == null) - factory = new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()}); + factory = new GridIoMessageFactory(null); return factory; } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 5876bee..3374928 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -47,9 +47,7 @@ import org.apache.ignite.internal.managers.communication.IgniteCommunicationBala import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest; import org.apache.ignite.internal.managers.communication.IgniteCommunicationSslBalanceTest; import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImplTest; import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest; -import org.apache.ignite.internal.managers.communication.MessageDirectTypeIdConflictTest; import org.apache.ignite.internal.processors.cache.BinaryMetadataRegistrationInsideEntryProcessorTest; import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest; import org.apache.ignite.internal.processors.cache.CacheAffinityKeyConfigurationMismatchTest; @@ -353,9 +351,6 @@ public class IgniteCacheTestSuite { GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationBalanceMultipleConnectionsTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationSslBalanceTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, IgniteMessageFactoryImplTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, MessageDirectTypeIdConflictTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteIncompleteCacheObjectSelfTest.class, ignoredTests); diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java index 0720b4a..ce04839 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java @@ -21,9 +21,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; -import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.util.UUIDCollectionMessage; -import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -124,8 +122,7 @@ public class GridMessageCollectionTest { assertEquals(m.directType(), type); - IgniteMessageFactory msgFactory = - new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()}); + GridIoMessageFactory msgFactory = new GridIoMessageFactory(null); Message mx = msgFactory.create(type); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java index e4aae7d..0ff53f7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java @@ -23,52 +23,107 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; -import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; /** * H2 Value message factory. */ -public class GridH2ValueMessageFactory implements MessageFactoryProvider { +public class GridH2ValueMessageFactory implements MessageFactory { /** {@inheritDoc} */ - @Override public void registerAll(IgniteMessageFactory factory) { - factory.register((short)-4, () -> GridH2Null.INSTANCE); - factory.register((short)-5, GridH2Boolean::new); - factory.register((short)-6, GridH2Byte::new); - factory.register((short)-7, GridH2Short::new); - factory.register((short)-8, GridH2Integer::new); - factory.register((short)-9, GridH2Long::new); - factory.register((short)-10, GridH2Decimal::new); - factory.register((short)-11, GridH2Double::new); - factory.register((short)-12, GridH2Float::new); - factory.register((short)-13, GridH2Time::new); - factory.register((short)-14, GridH2Date::new); - factory.register((short)-15, GridH2Timestamp::new); - factory.register((short)-16, GridH2Bytes::new); - factory.register((short)-17, GridH2String::new); - factory.register((short)-18, GridH2Array::new); - factory.register((short)-19, GridH2JavaObject::new); - factory.register((short)-20, GridH2Uuid::new); - factory.register((short)-21, GridH2Geometry::new); - factory.register((short)-22, GridH2CacheObject::new); - factory.register((short)-30, GridH2IndexRangeRequest::new); - factory.register((short)-31, GridH2IndexRangeResponse::new); - factory.register((short)-32, GridH2RowMessage::new); - factory.register((short)-33, GridH2QueryRequest::new); - factory.register((short)-34, GridH2RowRange::new); - factory.register((short)-35, GridH2RowRangeBounds::new); - factory.register((short)-54, QueryTable::new); - factory.register((short)-55, GridH2DmlRequest::new); - factory.register((short)-56, GridH2DmlResponse::new); - factory.register((short)-57, GridH2SelectForUpdateTxDetails::new); - } + @Nullable @Override public Message create(short type) { + switch (type) { + case -4: + return GridH2Null.INSTANCE; - /** {@inheritDoc} */ - @Override @Nullable public Message create(short type) { - throw new UnsupportedOperationException(); + case -5: + return new GridH2Boolean(); + + case -6: + return new GridH2Byte(); + + case -7: + return new GridH2Short(); + + case -8: + return new GridH2Integer(); + + case -9: + return new GridH2Long(); + + case -10: + return new GridH2Decimal(); + + case -11: + return new GridH2Double(); + + case -12: + return new GridH2Float(); + + case -13: + return new GridH2Time(); + + case -14: + return new GridH2Date(); + + case -15: + return new GridH2Timestamp(); + + case -16: + return new GridH2Bytes(); + + case -17: + return new GridH2String(); + + case -18: + return new GridH2Array(); + + case -19: + return new GridH2JavaObject(); + + case -20: + return new GridH2Uuid(); + + case -21: + return new GridH2Geometry(); + + case -22: + return new GridH2CacheObject(); + + case -30: + return new GridH2IndexRangeRequest(); + + case -31: + return new GridH2IndexRangeResponse(); + + case -32: + return new GridH2RowMessage(); + + case -33: + return new GridH2QueryRequest(); + + case -34: + return new GridH2RowRange(); + + case -35: + return new GridH2RowRangeBounds(); + + case -54: + return new QueryTable(); + + case -55: + return new GridH2DmlRequest(); + + case -56: + return new GridH2DmlResponse(); + + case -57: + return new GridH2SelectForUpdateTxDetails(); + } + + return null; } /**