Kevin Jin created IGNITE-7849:
---------------------------------

             Summary: Generating a CacheEntryEventFilter from a SqlQuery
                 Key: IGNITE-7849
                 URL: https://issues.apache.org/jira/browse/IGNITE-7849
             Project: Ignite
          Issue Type: Wish
          Components: cache, sql
    Affects Versions: 2.3
            Reporter: Kevin Jin
             Fix For: None


Currently when we want to use the same predicate for the continuous query and 
initial query, it's easy enough to write something like this, assuming we are 
fine with the performance of ScanQuery:

{{import org.apache.ignite.Ignite;}}
{{import org.apache.ignite.IgniteCache;}}
{{import org.apache.ignite.Ignition;}}
{{import org.apache.ignite.cache.query.ContinuousQuery;}}
{{import org.apache.ignite.cache.query.QueryCursor;}}
{{import org.apache.ignite.cache.query.ScanQuery;}}
{{import javax.cache.Cache;}}
{{import javax.cache.event.CacheEntryEvent;}}
{{public class IgniteContinuousQueryExample {}}
{{   private static final String CLUSTER = "TESTGRID";}}
{{   private static final String TABLE = "TESTTABLE";}}

{{  }}{{private static boolean isInteresting(Integer key, String value) {}}
{{     return key > 10;}}
{{   }}}
{{  private static boolean isInteresting(CacheEntryEvent<? extends Integer, ? 
extends String> event) {}}
{{     return isInteresting(event.getKey(), event.getValue());}}
{{   }}}
{{  private static void handleEntry(Cache.Entry<? extends Integer, ? extends 
String> event) {}}
{{     System.out.println("Received value for " + event.getKey() + ": " + 
event.getValue());}}
{{   }}}

{{  }}{{private static void handleResultSet(Iterable<? extends Cache.Entry<? 
extends Integer, ? extends String>> events) {}}
{{     events.forEach(IgniteContinuousQueryExample::handleEntry);}}
{{   }}}
{{  public static void main(String[] args) throws InterruptedException {}}
{{     try (Ignite client = Ignition.ignite(CLUSTER); IgniteCache<Integer, 
String> cache = client.cache(TABLE)) {}}
{{       for (int i = 0; i < 20; i++)}}
{{         cache.put(i, Integer.toString(i));}}

{{      }}{{ContinuousQuery<Integer, String> query = new ContinuousQuery<>();}}
{{       query.setRemoteFilterFactory(() -> 
IgniteContinuousQueryExample::isInteresting);}}
{{       query.setInitialQuery(new 
ScanQuery<>(IgniteContinuousQueryExample::isInteresting));}}

{{      
}}{{query.setLocalListener(IgniteContinuousQueryExample::handleResultSet);}}
{{       try (QueryCursor<Cache.Entry<Integer, String>> resultSet = 
cache.query(query)) {}}
{{         handleResultSet(resultSet);}}

{{        for (int i = 20; i < 30; i++)}}
{{           cache.put(i, Integer.toString(i));}}{{Thread.sleep(60000);}}
{{       }}}
{{     }}}
{{   }}}
{{}}}

However, this becomes more inconvenient when we want to use SqlQuery in the 
initial query to take advantage of indexing. This is the best that I can do:

query.setRemoteFilterFactory(() -> entry -> entry.getKey() > 10);
 query.setInitialQuery(new SqlQuery<>(String.sql, "`_key` > 10"));

This is obviously not ideal because we have to specify the predicate in two 
different ways. A quick Google revealed that there are products out there that 
more seamlessly support this use case of continuous querying. I understand that 
Ignite isn't built on top of SQL, unlike the commercial RDBMSes I found, so 
maybe this is an out-of-scope feature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to