Hi,

I hope this helps.

This is the flow. It is very simple.
Although, the code in the ComputeJob (executor.compute(request, algType,
correlationId);) is relatively application complex.

I hope this code makes sense. 
I had to take the actual code and expunge all of the actual Domain bits from
it…

But as far as Ignite is concerned, it is mostly boilerplate.

Thanks,
-- Chris 

=====================================
Invoke: 

    private List<AResponse> executeTaskOnGrid(AComputeTask<ARequest,
AResponse> computeTask,  List<UUID> uuids) {
             return
managedIgnite.getCompute().withTimeout(timeout).execute(computeTask, uuids);
    }

=======================================
ComputeTask:

public class AComputeTask ComputeTask<TRequest extends ARequest , TResponse>
        extends ComputeTaskAdapter<Collection&lt;UUID>, List<TResponse>> {

    private final AExecutorType type;
    private final TRequest rootARequest;
    private final AlgorithmType algType;
    private final String correlationId;
    private IgniteCacheName cacheName;

    @IgniteInstanceResource
    private Ignite ignite;

    public AComputeTask(AExecutorType type, TRequest request,  AlgorithmType
algType,  String correlationId) {
        this.cacheName = IgniteCacheName.ACache;
        this.type = type;
        this.rootARequest = request;
        this.algType = algType;
        this.correlationId = correlationId;
    }

    @Nullable
    @Override
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid, @Nullable Collection<UUID> cacheKeys)
            throws IgniteException {
        Map<ClusterNode, Collection&lt;UUID>> nodeToKeysMap =
ignite.<UUID>affinity(cacheName.name()).mapKeysToNodes(cacheKeys);
        Map<ComputeJob, ClusterNode> jobMap = new HashMap<>();
        for (Map.Entry<ClusterNode, Collection&lt;UUID>> mapping :
nodeToKeysMap.entrySet()) {
            ClusterNode node = mapping.getKey();
            final Collection<UUID> mappedKeys = mapping.getValue();

            if (node != null) {
                ComputeBatchContext context = new
ComputeBatchContext(node.id(), node.consistentId(), correlationId);
                Map<AlgorithmType, UUID[]> nodeRequestUUIDMap =
Collections.singletonMap(algType, convertToArray(mapping.getValue()));
                ARequest nodeARequest = new ARequest(rootARequest,
nodeRequestUUIDMap);
                AComputeJob job = new AComputeJob(type, nodeARequest,
algType, context);
                jobMap.put(job, node);
            }
        }
        return jobMap;
    }

    private UUID[] convertToArray(Collection<UUID> cacheKeys) {
        return cacheKeys.toArray(new UUID[cacheKeys.size()]);
    }

    @Nullable
    @Override
    public List<TResponse> reduce(List<ComputeJobResult> results) throws
IgniteException {
        List<TResponse> responses = new ArrayList<>();
        for (ComputeJobResult res : results) {
            if (res.getException() != null) {
                ARequest  request = ((AComputeJob)
res.getJob()).getARequest();

                // The entire result failed. So return all as errors
                AExecutor<TRequest, TResponse> executor =
AExecutorFactory.getAExecutor(type);
                List<UUID> unitUuids =
Lists.newArrayList(request.getMappedUUIDs().get(algType));
                List<TResponse> errorResponses =
executor.createErrorResponses(unitUuids.stream(),
ErrorCode.UnhandledException);
                responses.addAll(errorResponses);
            } else {
                List<TResponse> perNode = res.getData();
                responses.addAll(perNode);
            }
        }
        return response;
    }
}

==================================
ComputeJob

public class AComputeJob<TRequest extends ARequest, TResponse> extends
ComputeJobAdapter {
    @Getter
    private final ExecutorType executorType;
    @Getter
    private final TRequest request;
    @Getter
    private final AlgorithmType algType;
    @Getter
    private final String correlationId;
    @Getter
    private final ComputeBatchContext context;

    @IgniteInstanceResource
    private Ignite ignite;
    @JobContextResource
    private ComputeJobContext jobContext;

    public AComputeJob(ExecutorType executorType, TRequest request,
AlgorithmType algType, ComputeBatchContext context) {
        this.executorType = executorType;
        this.request = request;
        this.algType = algType;
        this.correlationId = context.getCorrelationId();
        this.context = context;
    }
    
    @Override
    public Object execute() throws IgniteException {
        Executor<TRequest, TResponse> executor =
ExecutorFactory.getExecutor(executorType);
        return executor.compute(request, algType, correlationId);
    }

    @Override
    public void cancel() {
        //Indicates that the cluster wants us to cooperatively cancel the
job
        //Since we expect these to run quickly, not going to actually do
anything with this right now
    }
}












--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/10X-decrease-in-performance-with-Ignite-2-0-0-tp12637p12664.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to