>
> Currently Ignite doesn't support automatic serialization of collections,
> there's an open task for that:
> https://issues.apache.org/jira/browse/IGNITE-23836
> The idea was that you should be able to return values returned from the
> Tables API directly, like Tuple, and unpack them on the client side.
> In your case the JobResult object could use custom marshaller, the
> org.apache.ignite.marshalling.ByteArrayMarshaller will fit just fine here.
> You need to add resultMarshaller(ByteArrayMarshaller.create()) call to
> the JobDescriptor builder chain and override resultMarshaller method in
> the job class so that it also returns ByteArrayMarshaller.create().
>
>
Thanks Vadim! Can't believe that I missed that, everything is working as
expected now, it also fixed the sporadic ConcurrentModificationException I
was getting with my broken implementation.

BTW, is this intentional that the result generic type of your JobDescriptor
> is MatchDiscoveryResult but the job result type is JobResult?


I was trying to beautify / sanitize my example for this post, my mistake.


For the second question it seems that you want colocated execution? You can
> check the docs here
> https://ignite.apache.org/docs/ignite3/latest/developers-guide/compute/compute#colocated-execution
> Basically this will execute the job on a node that have the data for the
> specified key and then if you need a partition number for something you can
> get it from the JobExecutionContext
>

Yes, I'm aware of colocated execution.

> In Apache Ignite, you can execute colocated computations by specifying a
> job target that directs the task to run on the node holding the required
> data.
>

But a single node holds multiple partitions of a given table, so if you run
a ComputeJob on the targeted node for a given table, it iterates over all
the parititions it holds during a IgniteTables.query(), correct?


Or can you query directly into the desired partition of the table like in
Ignite 2.x ScanQuery<>(partitionID)?



e.g. with @Table(colocateBy = @ColumnRef("affinity_colocated_column"))


Does IgniteTables.query(null, and(columnValue("affinity_colocated_column",
equalTo("value-that-maps-to-partitionID"))) jump straight to the correct
partition with some magic, or is it solely a filter unless properly indexed?


Hope you understand my question, thanks again,

Gilles





On Tue, 24 Mar 2026 at 09:53, Vadim Pakhnushev <[email protected]>
wrote:

> Hi Gilles!
>
> Currently Ignite doesn't support automatic serialization of collections,
> there's an open task for that:
> https://issues.apache.org/jira/browse/IGNITE-23836
> The idea was that you should be able to return values returned from the
> Tables API directly, like Tuple, and unpack them on the client side.
> In your case the JobResult object could use custom marshaller, the
> org.apache.ignite.marshalling.ByteArrayMarshaller will fit just fine here.
> You need to add resultMarshaller(ByteArrayMarshaller.create()) call to
> the JobDescriptor builder chain and override resultMarshaller method in
> the job class so that it also returns ByteArrayMarshaller.create().
>
> BTW, is this intentional that the result generic type of your
> JobDescriptor is MatchDiscoveryResult but the job result type is JobResult
> ?
>
> For the second question it seems that you want colocated execution? You
> can check the docs here
> https://ignite.apache.org/docs/ignite3/latest/developers-guide/compute/compute#colocated-execution
> Basically this will execute the job on a node that have the data for the
> specified key and then if you need a partition number for something you can
> get it from the JobExecutionContext
>
> Hope this helps,
> Vadim
>
> On Tue, Mar 24, 2026 at 2:17 AM Gilles <[email protected]>
> wrote:
>
>> Hi again, I'm stuck on returning a result from node's compute job to the
>> client.
>>
>> public class MatchDiscoveryJob implements
>> ComputeJob<MatchDiscoveryQueryArgs, JobResult> {
>>
>> @Override
>>
>> public CompletableFuture<JobResult> executeAsync(JobExecutionContext
>> context, MatchDiscoveryQueryArgs args) {
>>
>> KeyValueView<MatchDiscoveryCacheKey, MatchDiscoveryEntry> view =
>> context.ignite().tables()
>>
>> .table(MatchDiscoveryEntry.TABLE_NAME)
>>
>> .keyValueView(MatchDiscoveryCacheKey.class, MatchDiscoveryEntry.class);
>>
>>
>> ArrayList<Long> reservoir = new ArrayList<>();
>>
>>
>> // Dummy data, iterating over the view with criteria works well
>>
>>
>> reservoir.add(1L);
>>
>> reservoir.add(2L);
>>
>> reservoir.add(3L);
>>
>>
>> return CompletableFuture.completedFuture(new JobResult(reservoir));
>>
>> }
>>
>> }
>>
>>
>> public class JobResult {
>>
>> ArrayList<Long> ids;
>>
>>
>> public JobResult() {
>>
>> }
>>
>>
>> public JobResult(ArrayList<Long> ids) {
>>
>> this.matchedUserIds = ids;
>>
>> }
>>
>>
>> public List<Long> getIds() {
>>
>> return new ArrayList<>(ids);
>>
>> }
>>
>> }
>>
>>
>>
>> public CompletableFuture<List<Long>> callJob(String affinityKey,
>> MatchDiscoveryQueryArgs args) {
>>
>> Tuple key = Tuple.create().set("open_location",
>> affinityKey).set("user_id", args.userID);
>>
>> JobTarget targetNode =
>> JobTarget.colocated(MatchDiscoveryEntry.TABLE_NAME, key);
>>
>>
>> JobDescriptor<MatchDiscoveryQueryArgs, MatchDiscoveryResult> descriptor =
>>
>> JobDescriptor.builder(MatchDiscoveryJob.class)
>>
>> .resultClass(JobResult.class)
>>
>> .units(new DeploymentUnit(DISCOVERY_UNIT_NAME, DISCOVERY_UNIT_VERSION))
>>
>> .build();
>>
>>
>> CompletableFuture<List<Long>> futureResult = ignite.compute()
>>
>> .executeAsync(targetNode, descriptor,
>> args.withAffinityLocation(affinityKey))
>>
>> .thenApply(MatchDiscoveryResult::getIds);
>>
>>
>> List<Long> result = futureResult.join(); // Size of 3, elements all null
>>
>> return futureResult;
>>
>> }
>>
>>
>>
>> The result on the client always has the correct size of 3, but all
>> elements are always null.
>>
>> Steps;
>>
>> 1. compile jar containing all classes
>>
>> 2. start nodes (docker compose up)
>>
>> 3. cli deploy the unit to the cluster
>>
>> 4. Call the callJob(...) function from a client (non-embedded)
>>
>>
>>
>> 1. Am I doing anything wrong here? Examples are limited to primitive
>> boxed types. I've also tried to implement the Serializable interface on
>> MatchDiscoveryResult, async / sync calls
>>
>> 2. Additionally is there a way to query directly in the correct partition
>> of the node (like in Ignite 2.x)? Or is
>> Criteria.and(Criteria.columnValue("open_location",
>> Criteria.equalTo(args.affinityLocation)) enough to stick it to the
>> partition's affinity?
>>
>> Thanks in advance,
>>
>> Gilles
>>
>>
>>
>>
>>
>>
>>
>>
>>

Reply via email to