Hi devs,

I have a dataset that needs to be iteratively processed which is larger
than the available total cluster memory. So I am using ignite native
persistence and use compute broadcast method distribute processing logic.

This works really well on small data (which can fit in memory) but is
significantly slower on larger datasets. When checked the data reading
throughput using "iotop" command it shows ignite is reading data at a very
slow rate (<2 MB).

I have attached a stripped down version of my code below. I am using Scan
query to scan through all the data. Is there a better alternative for this?

Thank you
-Supun

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import
org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;

import javax.cache.Cache;
import java.util.Iterator;

public class SlowDurableMemoryScan {
    public static void main(String[] args) {
        Ignition.setClientMode(true);

        System.out.println("Starting test...");
        try (Ignite ignite =
Ignition.start("../../config/cloudlab-ignite.xml")) {

            ignite.active(true);

            CacheConfiguration<Float, Float[]> cacheConfig = new
CacheConfiguration<>("S");
            cacheConfig.setCacheMode(CacheMode.PARTITIONED);
            cacheConfig.setDataRegionName("Default_Region");
            cacheConfig.setStoreKeepBinary(true);
            cacheConfig.setBackups(0);
            cacheConfig.setAffinity(new
RendezvousAffinityFunction().setExcludeNeighbors(true).setPartitions(1024));

            IgniteCache<Float, Float[]> S =
ignite.getOrCreateCache(cacheConfig);

            cacheConfig = new CacheConfiguration<>("T");
            cacheConfig.setCacheMode(CacheMode.PARTITIONED);
            cacheConfig.setDataRegionName("Default_Region");
            cacheConfig.setStoreKeepBinary(true);
            cacheConfig.setBackups(0);
            cacheConfig.setAffinity(new
RendezvousAffinityFunction().setExcludeNeighbors(true).setPartitions(1024));

            IgniteCache<Float, Float[]> T =
ignite.getOrCreateCache(cacheConfig);

            System.out.println("Starting initial loading...");
            long prev_time = System.currentTimeMillis();
            ignite.compute(ignite.cluster().forDataNodes("S")).broadcast(()
-> {
                int[] partitions =
ignite.affinity("S").primaryPartitions(ignite.cluster().localNode());

                IgniteDataStreamer<Float, Float[]> streamerS =
Ignition.localIgnite().dataStreamer("S");
                streamerS.perNodeBufferSize(100);

                for (final int partition : partitions) {
                    System.out.println("Partition: " + partition);
                    for (int i = 0; i < 16; i++) {
                        streamerS.addData((float) (partition + 1.0 / (i +
1)), new Float[1024 * 1024]);
                    }
                }
            });
            System.out.println("Data loading time(s): " +
((System.currentTimeMillis() - prev_time) / 1000));


            //Scan the data and process
            prev_time = System.currentTimeMillis();
            ignite.compute(ignite.cluster().forDataNodes("S")).broadcast(()
-> {
                IgniteDataStreamer<Float, Float[]> streamerT =
Ignition.localIgnite().dataStreamer("T");
                streamerT.perNodeBufferSize(100);
                int[] partitions =
ignite.affinity("S").primaryPartitions(ignite.cluster().localNode());

                for (final int partition : partitions) {
                    ScanQuery<Float, Float[]> query = new ScanQuery<>();
                    query.setLocal(true);
                    query.setPartition(partition);
                    QueryCursor<Cache.Entry<Float, Float[]>> cursor =
S.query(query);

                    int count = 0;
                    for (Iterator<Cache.Entry<Float, Float[]>> it =
cursor.iterator(); it.hasNext(); ) {
                        Cache.Entry<Float, Float[]> entry = it.next();

                        streamerT.addData(entry.getKey(), entry.getValue());
                        count += 1;
                        System.out.println("Count: " + count);
                    }
                }
            });
            System.out.println("Data update time(s): " +
((System.currentTimeMillis() - prev_time) / 1000));

        }
    }
}

Reply via email to