Here is a rough structure of a cache. IgniteBaseCache is a wrapper on top of 
IgniteCache. It initializes the cache and a streamer for the cache.  

public class NormalizedDataCache extends IgniteBaseCache<RawPoint, RawPoint> {

        public NormalizedPointCache() {
                super(“cache_name”);
        }

        @Override
        protected CacheConfiguration<RawPoint, RawPoint> 
getCacheConfiguration() {
                CacheConfiguration<RawPoint, RawPoint> normalizedPointsCfg = 
new CacheConfiguration<RawPoint, RawPoint>();
                normalizedPointsCfg.setOnheapCacheEnabled(true);
                return normalizedPointsCfg;
        }

        @Override
        protected void setStreamerProperties() {
                fStreamer.autoFlushFrequency(1000);
                fStreamer.perNodeParallelOperations(8);
                fStreamer.perNodeBufferSize(102400);
        }

        public void addData(RawPoint Point) {
                // Identifier is the partition key, its been decorated with 
@AffinityKeyMapped in the class
                Point.setIdentifier(fNormalizerUtil.getIdentifier(Point));
                addToStream(Point, Point);
        }

        @Override
        protected StreamReceiver<RawPoint, RawPoint> getDataStreamerReceiver() {
                // normalize the raw data via DataStreamer's transform 
functionality.
                return StreamTransformer.from((e, arg) -> {
                        new NormalizerAdapter().process((RawPoint) arg[0]);
                        // Transformers are supposed to update the data and 
then write it to the cache. But we are using this cache
                        // to distribute data, so we are not writing the data 
to cache
                                return null;
                        });
        }
}

NormalizerAdapter is another wrapper for internal class. It is the first stage 
of the processing. This internal class uses other distributed caches and 
creates a different object. That object gets added to yet another cache “B” via 
streamer. That is second stage of the processing. The other cache “B” is 
similar to this onw. The second cache “B” has similar receiver function. Which 
updates the object and writes it to application’s internal structure. These two 
caches are used to distribute the data based on affinity key. We are not 
storeing the data in these two caches.

After your suggestion yesterday, I updated addData method is this snippet. 
Previously I was creating a key with some properties of RawPoint. Now I have 
added the affinity key to RawPoint. Reducing the number of objects I was 
creating.

Thanks,
Biren

On 8/24/17, 2:16 PM, "vkulichenko" <[email protected]> wrote:

    Biren,
    
    Can you show the code of the receiver?
    
    -Val
    
    
    
    --
    View this message in context: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dignite-2Dusers.70518.x6.nabble.com_Cluster-2Dsegmentation-2Dtp16314p16411.html&d=DwICAg&c=Zok6nrOF6Fe0JtVEqKh3FEeUbToa1PtNBZf6G01cvEQ&r=rbkF1xy5tYmkV8VMdTRVaIVhaXCNGxmyTB5plfGtWuY&m=uTFJ0dsOfKebPVHeYtxynWyF05QZ1L_VwKl88GOCfhs&s=qpsio0YIs_DqiTGNkLSMR-z76AFBwbNv-LvhPjwQOy8&e=
    Sent from the Apache Ignite Users mailing list archive at Nabble.com.
    

Reply via email to