rpuch commented on code in PR #3300:
URL: https://github.com/apache/ignite-3/pull/3300#discussion_r1507060827
##########
modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java:
##########
@@ -337,82 +337,167 @@ private List<ClassDescriptorMessage>
prepareMarshal(NetworkMessage msg) throws E
/**
* Sends a message to the current node.
*
- * @param msg Message.
+ * @param message Message.
* @param correlationId Correlation id.
*/
- private void sendToSelf(NetworkMessage msg, @Nullable Long correlationId) {
- for (NetworkMessageHandler networkMessageHandler :
getMessageHandlers(msg.groupType())) {
- networkMessageHandler.onReceived(msg,
topologyService.localMember().name(), correlationId);
+ private void sendToSelf(NetworkMessage message, @Nullable Long
correlationId) {
+ for (HandlerContext context : getHandlerContexts(message.groupType()))
{
+ // Invoking on the same thread, ignoring the executor chooser
registered with the handler.
+ context.handler().onReceived(message,
topologyService.localMember().name(), correlationId);
}
}
/**
- * Handles an incoming message.
+ * Handles a message coming from the network (not from the same node).
*
- * @param obj Incoming message wrapper.
+ * @param inNetworkObject Incoming message wrapper.
*/
- private void onMessage(InNetworkObject obj) {
- assert isInNetworkThread();
+ private void handleMessageFromNetwork(InNetworkObject inNetworkObject) {
+ assert isInNetworkThread() : Thread.currentThread().getName();
+
+ if (senderIdIsStale(inNetworkObject)) {
+ logMessageSkipDueToSenderLeft(inNetworkObject);
+ return;
+ }
+
+ if (inNetworkObject.message() instanceof InvokeResponse) {
+ Executor executor = chooseExecutorInInboundPool(inNetworkObject);
+ executor.execute(() -> handleInvokeResponse(inNetworkObject));
+ return;
+ }
- inboundExecutors.execute(obj.connectionIndex(), () -> {
+ NetworkMessage payload;
+ Long correlationId = null;
+ if (inNetworkObject.message() instanceof InvokeRequest) {
+ InvokeRequest invokeRequest = (InvokeRequest)
inNetworkObject.message();
+ payload = invokeRequest.message();
+ correlationId = invokeRequest.correlationId();
+ } else {
+ payload = inNetworkObject.message();
+ }
+
+ Iterator<HandlerContext> handlerContexts =
getHandlerContexts(payload.groupType()).iterator();
+ if (!handlerContexts.hasNext()) {
+ // No need to handle this.
+ return;
+ }
+
+ HandlerContext firstHandlerContext = handlerContexts.next();
+ Executor firstHandlerExecutor = chooseExecutorFor(payload,
inNetworkObject, firstHandlerContext.executorChooser());
+
+ Long finalCorrelationId = correlationId;
+ firstHandlerExecutor.execute(() -> {
long startedNanos = System.nanoTime();
try {
- handleIncomingMessage(obj);
+ handleStartingWithFirstHandler(payload, finalCorrelationId,
inNetworkObject, firstHandlerContext, handlerContexts);
} catch (Throwable e) {
- logAndRethrowIfError(obj, e);
+ logAndRethrowIfError(inNetworkObject, e);
} finally {
long tookMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos);
if (tookMillis > 100) {
- LOG.warn("Processing of {} from {} took {} ms",
obj.message(), obj.consistentId(), tookMillis);
+ LOG.warn("Processing of {} from {} took {} ms",
inNetworkObject.message(), inNetworkObject.consistentId(), tookMillis);
}
}
});
}
- private void handleIncomingMessage(InNetworkObject obj) {
- if (senderIdIsStale(obj)) {
- LOG.info("Sender ID {} ({}) is stale, so skipping message
handling: {}", obj.launchId(), obj.consistentId(), obj.message());
- return;
- }
+ private static void logMessageSkipDueToSenderLeft(InNetworkObject
inNetworkObject) {
+ LOG.info("Sender ID {} ({}) is stale, so skipping message handling:
{}",
+ inNetworkObject.launchId(), inNetworkObject.consistentId(),
inNetworkObject.message()
+ );
+ }
- NetworkMessage msg = obj.message();
- DescriptorRegistry registry = obj.registry();
+ private boolean senderIdIsStale(InNetworkObject obj) {
+ return staleIdDetector.isIdStale(obj.launchId());
+ }
+
+ private void handleInvokeResponse(InNetworkObject inNetworkObject) {
+ unmarshalMessage(inNetworkObject);
+
+ InvokeResponse response = (InvokeResponse) inNetworkObject.message();
+ onInvokeResponse(response.message(), response.correlationId());
+ }
+
+ private void unmarshalMessage(InNetworkObject obj) {
try {
- msg.unmarshal(marshaller, registry);
+ obj.message().unmarshal(marshaller, obj.registry());
} catch (Exception e) {
throw new IgniteException("Failed to unmarshal message: " +
e.getMessage(), e);
}
- if (msg instanceof InvokeResponse) {
- InvokeResponse response = (InvokeResponse) msg;
- onInvokeResponse(response.message(), response.correlationId());
- return;
+ }
+
+ private Executor chooseExecutorFor(NetworkMessage payload, InNetworkObject
obj, ExecutorChooser<NetworkMessage> chooser) {
+ if (wantsInboundPool(chooser)) {
+ return chooseExecutorInInboundPool(obj);
+ } else {
+ return chooser.choose(payload);
}
+ }
- Long correlationId = null;
- NetworkMessage message = msg;
+ private Executor chooseExecutorInInboundPool(InNetworkObject obj) {
+ return inboundExecutors.stripeFor(obj.connectionIndex());
+ }
- if (msg instanceof InvokeRequest) {
- // Unwrap invocation request
- InvokeRequest messageWithCorrelation = (InvokeRequest) msg;
- correlationId = messageWithCorrelation.correlationId();
- message = messageWithCorrelation.message();
+ /**
+ * Finishes unmarshalling the message and handles it on current thread on
first handler. Also handles it with other
+ * handlers (second and so on) on executors chosen by their choosers.
+ */
+ private void handleStartingWithFirstHandler(
+ NetworkMessage payload,
+ @Nullable Long correlationId,
+ InNetworkObject obj,
+ HandlerContext firstHandlerContext,
+ Iterator<HandlerContext> remainingContexts
+ ) {
+ if (senderIdIsStale(obj)) {
+ logMessageSkipDueToSenderLeft(obj);
+ return;
}
+ unmarshalMessage(obj);
+
String senderConsistentId = obj.consistentId();
// Unfortunately, since the Messaging Service is used by ScaleCube
itself, some messages can be sent
// before the node is added to the topology. ScaleCubeMessage handler
guarantees to handle null sender consistent ID
// without throwing an exception.
- assert message instanceof ScaleCubeMessage || senderConsistentId !=
null;
+ assert payload instanceof ScaleCubeMessage || senderConsistentId !=
null;
+
+ // If first handler chose an inbound pool, AND some other handlers
also want to be executed on an inbound pool, this means that
+ // they will be guaranteed to be executed on the same thread (as the
inbound pool is striped, the stripe key is sender+channelId).
+ // Hence these handlers will be executed on the current thread, so, to
avoid additional latency, we'll collect them to a list
+ // and execute here directly after executing the first handler,
instead of submitting them to the inbound pool.
+ List<NetworkMessageHandler> handlersWantingSameThreadAsFirst =
List.of();
- for (NetworkMessageHandler networkMessageHandler :
getMessageHandlers(message.groupType())) {
- networkMessageHandler.onReceived(message, senderConsistentId,
correlationId);
+ while (remainingContexts.hasNext()) {
+ HandlerContext handlerContext = remainingContexts.next();
+
+ if (wantSamePool(firstHandlerContext, handlerContext)) {
+ if (handlersWantingSameThreadAsFirst.isEmpty()) {
+ handlersWantingSameThreadAsFirst = new ArrayList<>();
+ }
+ handlersWantingSameThreadAsFirst.add(handlerContext.handler());
+ } else {
+ Executor executor = chooseExecutorFor(payload, obj,
handlerContext.executorChooser());
+ executor.execute(() ->
handlerContext.handler().onReceived(payload, senderConsistentId,
correlationId));
+ }
+ }
+
+ firstHandlerContext.handler().onReceived(payload, senderConsistentId,
correlationId);
+
+ // Now execute those handlers that are guaranteed to be executed on
the same thread as the first one.
+ for (NetworkMessageHandler handler : handlersWantingSameThreadAsFirst)
{
+ handler.onReceived(payload, senderConsistentId, correlationId);
}
}
- private boolean senderIdIsStale(InNetworkObject obj) {
- return staleIdDetector.isIdStale(obj.launchId());
+ private static boolean wantSamePool(HandlerContext handlerContext1,
HandlerContext handlerContext2) {
+ return wantsInboundPool(handlerContext1) &&
wantsInboundPool(handlerContext2);
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]