Hi, I hope this helps.
This is the flow. It is very simple. Although, the code in the ComputeJob (executor.compute(request, algType, correlationId);) is relatively application complex. I hope this code makes sense. I had to take the actual code and expunge all of the actual Domain bits from it… But as far as Ignite is concerned, it is mostly boilerplate. Thanks, -- Chris ===================================== Invoke: private List<AResponse> executeTaskOnGrid(AComputeTask<ARequest, AResponse> computeTask, List<UUID> uuids) { return managedIgnite.getCompute().withTimeout(timeout).execute(computeTask, uuids); } ======================================= ComputeTask: public class AComputeTask ComputeTask<TRequest extends ARequest , TResponse> extends ComputeTaskAdapter<Collection<UUID>, List<TResponse>> { private final AExecutorType type; private final TRequest rootARequest; private final AlgorithmType algType; private final String correlationId; private IgniteCacheName cacheName; @IgniteInstanceResource private Ignite ignite; public AComputeTask(AExecutorType type, TRequest request, AlgorithmType algType, String correlationId) { this.cacheName = IgniteCacheName.ACache; this.type = type; this.rootARequest = request; this.algType = algType; this.correlationId = correlationId; } @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Collection<UUID> cacheKeys) throws IgniteException { Map<ClusterNode, Collection<UUID>> nodeToKeysMap = ignite.<UUID>affinity(cacheName.name()).mapKeysToNodes(cacheKeys); Map<ComputeJob, ClusterNode> jobMap = new HashMap<>(); for (Map.Entry<ClusterNode, Collection<UUID>> mapping : nodeToKeysMap.entrySet()) { ClusterNode node = mapping.getKey(); final Collection<UUID> mappedKeys = mapping.getValue(); if (node != null) { ComputeBatchContext context = new ComputeBatchContext(node.id(), node.consistentId(), correlationId); Map<AlgorithmType, UUID[]> nodeRequestUUIDMap = Collections.singletonMap(algType, convertToArray(mapping.getValue())); ARequest nodeARequest = new ARequest(rootARequest, nodeRequestUUIDMap); AComputeJob job = new AComputeJob(type, nodeARequest, algType, context); jobMap.put(job, node); } } return jobMap; } private UUID[] convertToArray(Collection<UUID> cacheKeys) { return cacheKeys.toArray(new UUID[cacheKeys.size()]); } @Nullable @Override public List<TResponse> reduce(List<ComputeJobResult> results) throws IgniteException { List<TResponse> responses = new ArrayList<>(); for (ComputeJobResult res : results) { if (res.getException() != null) { ARequest request = ((AComputeJob) res.getJob()).getARequest(); // The entire result failed. So return all as errors AExecutor<TRequest, TResponse> executor = AExecutorFactory.getAExecutor(type); List<UUID> unitUuids = Lists.newArrayList(request.getMappedUUIDs().get(algType)); List<TResponse> errorResponses = executor.createErrorResponses(unitUuids.stream(), ErrorCode.UnhandledException); responses.addAll(errorResponses); } else { List<TResponse> perNode = res.getData(); responses.addAll(perNode); } } return response; } } ================================== ComputeJob public class AComputeJob<TRequest extends ARequest, TResponse> extends ComputeJobAdapter { @Getter private final ExecutorType executorType; @Getter private final TRequest request; @Getter private final AlgorithmType algType; @Getter private final String correlationId; @Getter private final ComputeBatchContext context; @IgniteInstanceResource private Ignite ignite; @JobContextResource private ComputeJobContext jobContext; public AComputeJob(ExecutorType executorType, TRequest request, AlgorithmType algType, ComputeBatchContext context) { this.executorType = executorType; this.request = request; this.algType = algType; this.correlationId = context.getCorrelationId(); this.context = context; } @Override public Object execute() throws IgniteException { Executor<TRequest, TResponse> executor = ExecutorFactory.getExecutor(executorType); return executor.compute(request, algType, correlationId); } @Override public void cancel() { //Indicates that the cluster wants us to cooperatively cancel the job //Since we expect these to run quickly, not going to actually do anything with this right now } } -- View this message in context: http://apache-ignite-users.70518.x6.nabble.com/10X-decrease-in-performance-with-Ignite-2-0-0-tp12637p12664.html Sent from the Apache Ignite Users mailing list archive at Nabble.com.