Hi,
I hope this helps.
This is the flow. It is very simple.
Although, the code in the ComputeJob (executor.compute(request, algType,
correlationId);) is relatively application complex.
I hope this code makes sense.
I had to take the actual code and expunge all of the actual Domain bits from
it…
But as far as Ignite is concerned, it is mostly boilerplate.
Thanks,
-- Chris
=====================================
Invoke:
private List<AResponse> executeTaskOnGrid(AComputeTask<ARequest,
AResponse> computeTask, List<UUID> uuids) {
return
managedIgnite.getCompute().withTimeout(timeout).execute(computeTask, uuids);
}
=======================================
ComputeTask:
public class AComputeTask ComputeTask<TRequest extends ARequest , TResponse>
extends ComputeTaskAdapter<Collection<UUID>, List<TResponse>> {
private final AExecutorType type;
private final TRequest rootARequest;
private final AlgorithmType algType;
private final String correlationId;
private IgniteCacheName cacheName;
@IgniteInstanceResource
private Ignite ignite;
public AComputeTask(AExecutorType type, TRequest request, AlgorithmType
algType, String correlationId) {
this.cacheName = IgniteCacheName.ACache;
this.type = type;
this.rootARequest = request;
this.algType = algType;
this.correlationId = correlationId;
}
@Nullable
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid, @Nullable Collection<UUID> cacheKeys)
throws IgniteException {
Map<ClusterNode, Collection<UUID>> nodeToKeysMap =
ignite.<UUID>affinity(cacheName.name()).mapKeysToNodes(cacheKeys);
Map<ComputeJob, ClusterNode> jobMap = new HashMap<>();
for (Map.Entry<ClusterNode, Collection<UUID>> mapping :
nodeToKeysMap.entrySet()) {
ClusterNode node = mapping.getKey();
final Collection<UUID> mappedKeys = mapping.getValue();
if (node != null) {
ComputeBatchContext context = new
ComputeBatchContext(node.id(), node.consistentId(), correlationId);
Map<AlgorithmType, UUID[]> nodeRequestUUIDMap =
Collections.singletonMap(algType, convertToArray(mapping.getValue()));
ARequest nodeARequest = new ARequest(rootARequest,
nodeRequestUUIDMap);
AComputeJob job = new AComputeJob(type, nodeARequest,
algType, context);
jobMap.put(job, node);
}
}
return jobMap;
}
private UUID[] convertToArray(Collection<UUID> cacheKeys) {
return cacheKeys.toArray(new UUID[cacheKeys.size()]);
}
@Nullable
@Override
public List<TResponse> reduce(List<ComputeJobResult> results) throws
IgniteException {
List<TResponse> responses = new ArrayList<>();
for (ComputeJobResult res : results) {
if (res.getException() != null) {
ARequest request = ((AComputeJob)
res.getJob()).getARequest();
// The entire result failed. So return all as errors
AExecutor<TRequest, TResponse> executor =
AExecutorFactory.getAExecutor(type);
List<UUID> unitUuids =
Lists.newArrayList(request.getMappedUUIDs().get(algType));
List<TResponse> errorResponses =
executor.createErrorResponses(unitUuids.stream(),
ErrorCode.UnhandledException);
responses.addAll(errorResponses);
} else {
List<TResponse> perNode = res.getData();
responses.addAll(perNode);
}
}
return response;
}
}
==================================
ComputeJob
public class AComputeJob<TRequest extends ARequest, TResponse> extends
ComputeJobAdapter {
@Getter
private final ExecutorType executorType;
@Getter
private final TRequest request;
@Getter
private final AlgorithmType algType;
@Getter
private final String correlationId;
@Getter
private final ComputeBatchContext context;
@IgniteInstanceResource
private Ignite ignite;
@JobContextResource
private ComputeJobContext jobContext;
public AComputeJob(ExecutorType executorType, TRequest request,
AlgorithmType algType, ComputeBatchContext context) {
this.executorType = executorType;
this.request = request;
this.algType = algType;
this.correlationId = context.getCorrelationId();
this.context = context;
}
@Override
public Object execute() throws IgniteException {
Executor<TRequest, TResponse> executor =
ExecutorFactory.getExecutor(executorType);
return executor.compute(request, algType, correlationId);
}
@Override
public void cancel() {
//Indicates that the cluster wants us to cooperatively cancel the
job
//Since we expect these to run quickly, not going to actually do
anything with this right now
}
}
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/10X-decrease-in-performance-with-Ignite-2-0-0-tp12637p12664.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.