Can someone please have a look at below query? Thanks
On Dec 29, 2016 2:13 PM, "Ankit Singhai" <[email protected]> wrote: > Hi, > In my test case scenario I am pumping data into cache via data streamer > which has Created expiration policy of 60 seconds to have sliding window of > 60 seconds, after the initial bursts I am making my thread to sleep then > again pump data, but for the 2nd bursts I am not getting any events (local > or remote) only the data I receive is after initial search. Can somebody > point out what I am doing wrong here? > > Ignite Configuration > <bean id="ignite.cfg" > class="org.apache.ignite.configuration.IgniteConfiguration"> > <property name="clientMode" value="true"/> > <property name="gridName" value="TestGrid"/> > > <property name="peerClassLoadingEnabled" value="true"/> > > > > > > > > <property name="discoverySpi"> > <bean > class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> > <property name="ipFinder"> > > > > <bean > class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm. > TcpDiscoveryVmIpFinder"> > <property name="addresses"> > <list> > > <value>127.0.0.1:47500..47509</value> > </list> > </property> > </bean> > </property> > <property name="localAddress" value="127.0.0.1"/> > > <property name="heartbeatFrequency" value="2000"/> > </bean> > </property> > > <property name="lifecycleBeans"> > <list> > <bean class="com.gvc.LifeCycleBeanImpl" > id="lifeCycleBeanImpl"/> > </list> > </property> > > <property name="communicationSpi"> > <bean > class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"> > <property name="slowClientQueueLimit" value="1000"/> > <property name="localPort" value="4321"/> > </bean> > </property> > </bean> > > Cache Configuration > <bean id="ipCache" > class="org.apache.ignite.configuration.CacheConfiguration" > scope="singleton"> > <property name="name" value="ipCache"/> > > > <property name="cacheMode" value="REPLICATED"/> > > <property name="memoryMode" value="ONHEAP_TIERED"/> > > > <property name="offHeapMaxMemory" value="#{256 * 1024L * 1024L}"/> > > > <property name="backups" value="0"/> > > > <property name="writeSynchronizationMode" value="PRIMARY_SYNC"/> > > > <property name="startSize" value="#{1 * 1024 * 1024}"/> > > > <property name="swapEnabled" value="false"/> > > > <property name="evictionPolicy"> > <bean > class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy"> > > <property name="maxSize" value="1000"/> > </bean> > </property> > > > <property name="rebalanceMode" value="SYNC"/> > > > <property name="rebalanceBatchSize" value="#{1024 * 1024}"/> > > > <property name="rebalanceThrottle" value="0"/> > > > <property name="rebalanceThreadPoolSize" value="4"/> > </bean> > > Ignite Data Streamer Code via StreamTransformer > > CacheConfiguration<String,Integer> ipCacheConfiguration = > (CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache"); > ipCacheConfiguration.setIndexedTypes(String.class, Integer.class); > > ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new > CreatedExpiryPolicy(new Duration(SECONDS, 60)))); > > IgniteCache<String,Integer> ipCache = > ignite.getOrCreateCache(ipCacheConfiguration); > > Random RAND = new Random(); > > try (IgniteDataStreamer<String,Integer> igniteDataStreamer = > ignite.dataStreamer(ipCache.getName())){ > igniteDataStreamer.allowOverwrite(true); > > igniteDataStreamer.receiver(new > StreamTransformer<String,Integer>() { > @Override > public Object process(MutableEntry<String,Integer> > mutableEntry, Object... objects) throws EntryProcessorException { > Integer val = mutableEntry.getValue(); > > // Increment count by 1. > mutableEntry.setValue(val == null ? (int) 1L : val + > 1); > > return null; > } > }); > > int range = 1000; > for(int i = 1 ; i <= 100000 ; i++) { > igniteDataStreamer.addData(""+RAND.nextInt(range), 1); > } > > try { > Thread.sleep(70000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > > System.out.println("After Sleeping"); > for(int i = 1 ; i <= 100000 ; i++) { > igniteDataStreamer.addData(""+RAND.nextInt(range), 1); > } > } > > Continuous Query Code (running on an different JVM) > CacheConfiguration<String,Integer> ipCacheConfiguration = > (CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache"); > ipCacheConfiguration.setIndexedTypes(String.class, Integer.class); > > ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new > CreatedExpiryPolicy(new Duration(SECONDS, 60)))); > > IgniteCache<String,Integer> ipCache = > ignite.getOrCreateCache(ipCacheConfiguration); > > ContinuousQuery<String,Integer> continuousQuery = new > ContinuousQuery<>(); > continuousQuery.setInitialQuery(new ScanQuery<String, Integer>(new > IgniteBiPredicate<String, Integer>() { > @Override > public boolean apply(String integer, Integer integer2) { > return integer2 > 100; > } > })); > > continuousQuery.setLocalListener(new > CacheEntryUpdatedListener<String, Integer>() { > @Override > public void onUpdated(Iterable<CacheEntryEvent<? extends > String, ? extends Integer>> iterable) throws CacheEntryListenerException { > for (CacheEntryEvent events : iterable) { > System.out.println(" Inside local listener :: " + > events); > } > } > }); > > continuousQuery.setRemoteFilterFactory(new > Factory<CacheEntryEventFilter<String, Integer>>() { > @Override > public CacheEntryEventFilter<String, Integer> create() { > return new CacheEntryEventFilter<String, Integer>() { > @Override > public boolean evaluate(CacheEntryEvent<? extends > String, ? extends Integer> cacheEntryEvent) throws > CacheEntryListenerException { > System.out.println("Remote Listener"); > return cacheEntryEvent.getValue() > 100; > } > }; > } > }); > > try (QueryCursor<Cache.Entry<String, Integer>> cur = > ipCache.query(continuousQuery)){ > for(Cache.Entry<String,Integer> c : cur) { > System.out.println(c); > } > } > > Thanks > > > > -- > View this message in context: http://apache-ignite-users. > 70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9795. > html > Sent from the Apache Ignite Users mailing list archive at Nabble.com. >
