Hi again, I'm stuck on returning a result from node's compute job to the
client.

public class MatchDiscoveryJob implements
ComputeJob<MatchDiscoveryQueryArgs, JobResult> {

@Override

public CompletableFuture<JobResult> executeAsync(JobExecutionContext
context, MatchDiscoveryQueryArgs args) {

KeyValueView<MatchDiscoveryCacheKey, MatchDiscoveryEntry> view =
context.ignite().tables()

.table(MatchDiscoveryEntry.TABLE_NAME)

.keyValueView(MatchDiscoveryCacheKey.class, MatchDiscoveryEntry.class);


ArrayList<Long> reservoir = new ArrayList<>();


// Dummy data, iterating over the view with criteria works well


reservoir.add(1L);

reservoir.add(2L);

reservoir.add(3L);


return CompletableFuture.completedFuture(new JobResult(reservoir));

}

}


public class JobResult {

ArrayList<Long> ids;


public JobResult() {

}


public JobResult(ArrayList<Long> ids) {

this.matchedUserIds = ids;

}


public List<Long> getIds() {

return new ArrayList<>(ids);

}

}



public CompletableFuture<List<Long>> callJob(String affinityKey,
MatchDiscoveryQueryArgs args) {

Tuple key = Tuple.create().set("open_location", affinityKey).set("user_id",
args.userID);

JobTarget targetNode = JobTarget.colocated(MatchDiscoveryEntry.TABLE_NAME,
key);


JobDescriptor<MatchDiscoveryQueryArgs, MatchDiscoveryResult> descriptor =

JobDescriptor.builder(MatchDiscoveryJob.class)

.resultClass(JobResult.class)

.units(new DeploymentUnit(DISCOVERY_UNIT_NAME, DISCOVERY_UNIT_VERSION))

.build();


CompletableFuture<List<Long>> futureResult = ignite.compute()

.executeAsync(targetNode, descriptor,
args.withAffinityLocation(affinityKey))

.thenApply(MatchDiscoveryResult::getIds);


List<Long> result = futureResult.join(); // Size of 3, elements all null

return futureResult;

}



The result on the client always has the correct size of 3, but all elements
are always null.

Steps;

1. compile jar containing all classes

2. start nodes (docker compose up)

3. cli deploy the unit to the cluster

4. Call the callJob(...) function from a client (non-embedded)



1. Am I doing anything wrong here? Examples are limited to primitive boxed
types. I've also tried to implement the Serializable interface on
MatchDiscoveryResult, async / sync calls

2. Additionally is there a way to query directly in the correct partition
of the node (like in Ignite 2.x)? Or is
Criteria.and(Criteria.columnValue("open_location",
Criteria.equalTo(args.affinityLocation)) enough to stick it to the
partition's affinity?

Thanks in advance,

Gilles

Reply via email to