Hi,
In my test case scenario I am pumping data into cache via data streamer
which has Created expiration policy of 60 seconds to have sliding window of
60 seconds, after the initial bursts I am making my thread to sleep then
again pump data, but for the 2nd bursts I am not getting any events (local
or remote) only the data I receive is after initial search. Can somebody
point out what I am doing wrong here?
Ignite Configuration
<bean id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="clientMode" value="true"/>
<property name="gridName" value="TestGrid"/>
<property name="peerClassLoadingEnabled" value="true"/>
<property name="discoverySpi">
<bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
<property name="localAddress" value="127.0.0.1"/>
<property name="heartbeatFrequency" value="2000"/>
</bean>
</property>
<property name="lifecycleBeans">
<list>
<bean class="com.gvc.LifeCycleBeanImpl"
id="lifeCycleBeanImpl"/>
</list>
</property>
<property name="communicationSpi">
<bean
class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="slowClientQueueLimit" value="1000"/>
<property name="localPort" value="4321"/>
</bean>
</property>
</bean>
Cache Configuration
<bean id="ipCache"
class="org.apache.ignite.configuration.CacheConfiguration"
scope="singleton">
<property name="name" value="ipCache"/>
<property name="cacheMode" value="REPLICATED"/>
<property name="memoryMode" value="ONHEAP_TIERED"/>
<property name="offHeapMaxMemory" value="#{256 * 1024L * 1024L}"/>
<property name="backups" value="0"/>
<property name="writeSynchronizationMode" value="PRIMARY_SYNC"/>
<property name="startSize" value="#{1 * 1024 * 1024}"/>
<property name="swapEnabled" value="false"/>
<property name="evictionPolicy">
<bean
class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
<property name="maxSize" value="1000"/>
</bean>
</property>
<property name="rebalanceMode" value="SYNC"/>
<property name="rebalanceBatchSize" value="#{1024 * 1024}"/>
<property name="rebalanceThrottle" value="0"/>
<property name="rebalanceThreadPoolSize" value="4"/>
</bean>
Ignite Data Streamer Code via StreamTransformer
CacheConfiguration<String,Integer> ipCacheConfiguration =
(CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(new Duration(SECONDS, 60))));
IgniteCache<String,Integer> ipCache =
ignite.getOrCreateCache(ipCacheConfiguration);
Random RAND = new Random();
try (IgniteDataStreamer<String,Integer> igniteDataStreamer =
ignite.dataStreamer(ipCache.getName())){
igniteDataStreamer.allowOverwrite(true);
igniteDataStreamer.receiver(new
StreamTransformer<String,Integer>() {
@Override
public Object process(MutableEntry<String,Integer>
mutableEntry, Object... objects) throws EntryProcessorException {
Integer val = mutableEntry.getValue();
// Increment count by 1.
mutableEntry.setValue(val == null ? (int) 1L : val + 1);
return null;
}
});
int range = 1000;
for(int i = 1 ; i <= 100000 ; i++) {
igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
}
try {
Thread.sleep(70000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("After Sleeping");
for(int i = 1 ; i <= 100000 ; i++) {
igniteDataStreamer.addData(""+RAND.nextInt(range), 1);
}
}
Continuous Query Code (running on an different JVM)
CacheConfiguration<String,Integer> ipCacheConfiguration =
(CacheConfiguration<String,Integer>)applicationContext.getBean("ipCache");
ipCacheConfiguration.setIndexedTypes(String.class, Integer.class);
ipCacheConfiguration.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(new Duration(SECONDS, 60))));
IgniteCache<String,Integer> ipCache =
ignite.getOrCreateCache(ipCacheConfiguration);
ContinuousQuery<String,Integer> continuousQuery = new
ContinuousQuery<>();
continuousQuery.setInitialQuery(new ScanQuery<String, Integer>(new
IgniteBiPredicate<String, Integer>() {
@Override
public boolean apply(String integer, Integer integer2) {
return integer2 > 100;
}
}));
continuousQuery.setLocalListener(new
CacheEntryUpdatedListener<String, Integer>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends
String, ? extends Integer>> iterable) throws CacheEntryListenerException {
for (CacheEntryEvent events : iterable) {
System.out.println(" Inside local listener :: " +
events);
}
}
});
continuousQuery.setRemoteFilterFactory(new
Factory<CacheEntryEventFilter<String, Integer>>() {
@Override
public CacheEntryEventFilter<String, Integer> create() {
return new CacheEntryEventFilter<String, Integer>() {
@Override
public boolean evaluate(CacheEntryEvent<? extends
String, ? extends Integer> cacheEntryEvent) throws
CacheEntryListenerException {
System.out.println("Remote Listener");
return cacheEntryEvent.getValue() > 100;
}
};
}
});
try (QueryCursor<Cache.Entry<String, Integer>> cur =
ipCache.query(continuousQuery)){
for(Cache.Entry<String,Integer> c : cur) {
System.out.println(c);
}
}
Thanks
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/IgniteDataStreamer-with-Continuous-Query-tp9779p9795.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.