Hello, Looks like the bug is related to server-side continuous queries functionality. I've tried to reproduce it using thick-client and got the same results, in rare cases events are lost.
чт, 25 мая 2023 г. в 06:48, Pavel Tupitsyn <ptupit...@apache.org>: > > Thank you for the bug report, I will have a look. > > On Thu, May 25, 2023 at 5:10 AM LonesomeRain <lonesomer...@163.com> wrote: >> >> Hi, Jeremy McMillan >> >> >> >> I have created a Jira about this bug. >> >> >> >> https://issues.apache.org/jira/browse/IGNITE-19561 >> >> >> >> By executing these two test codes in the description, it is easy to >> reproduce the problem. >> >> >> >> What should I do next? Continuously following this Jira? >> >> >> >> >> >> 发件人: Jeremy McMillan >> 发送时间: 2023年5月24日 23:24 >> 收件人: user@ignite.apache.org >> 主题: Re: Ignite thin client continuous query listener cannot listen to all >> events >> >> >> >> Thanks for bringing this up! >> >> >> >> https://ignite.apache.org/docs/latest/key-value-api/continuous-queries#events-delivery-guarantees >> >> >> >> This sounds like you may have found a bug, but the details you've provided >> are not sufficient to help others recreate and observe it for themselves, >> and this effort needs to be recorded in a ticket. Would you be able to sign >> up for a Jira account and detail steps to reproduce this behavior? >> >> You may also want to research this: >> https://issues.apache.org/jira/browse/IGNITE-8035 >> >> >> >> On Mon, May 22, 2023 at 6:52 AM lonesomerain <lonesomer...@163.com> wrote: >> >> Hi, >> >> I have a question while using ignite 2.15.0 >> >> >> >> Problem scenario: >> >> Start the Ignite server of one node, start one thin client and create a >> continuous query listener, and then use 50 threads to add 500 data to the >> cache concurrently. >> >> Problem phenomenon: >> >> Through the information printed on the listener, it was found that the >> number of events listened to each time varies, possibly 496, 499 or 500... >> >> Test Code: >> >> public class StartServer { >> >> public static void main(String[] args) { >> >> Ignite ignite = Ignition.start(); >> >> } >> >> } >> >> >> >> public class StartThinClient { >> >> public static void main(String[] args) throws InterruptedException { >> >> String addr = "127.0.0.1:10800"; >> >> >> >> int threadNmu = 50; >> >> >> >> ClientConfiguration clientConfiguration = new ClientConfiguration(); >> >> clientConfiguration.setAddresses(addr); >> >> >> >> IgniteClient client1 = Ignition.startClient(clientConfiguration); >> >> >> >> ClientCache<Object, Object> cache1 = >> client1.getOrCreateCache("test"); >> >> >> >> ContinuousQuery<Object, Object> query = new ContinuousQuery<>(); >> >> query.setLocalListener(new CacheEntryUpdatedListener<Object, >> Object>() { >> >> @Override >> >> public void onUpdated(Iterable<CacheEntryEvent<?, ?>> >> cacheEntryEvents) throws CacheEntryListenerException { >> >> Iterator<CacheEntryEvent<?, ?>> iterator = >> cacheEntryEvents.iterator(); >> >> while (iterator.hasNext()) { >> >> CacheEntryEvent<?, ?> next = iterator.next(); >> >> System.out.println("----" + next.getKey()); >> >> } >> >> } >> >> }); >> >> >> >> cache1.query(query); >> >> >> >> IgniteClient client2 = Ignition.startClient(clientConfiguration); >> >> ClientCache<Object, Object> cache2 = client2.cache("test"); >> >> >> >> Thread[] threads = new Thread[threadNmu]; >> >> for (int i = 0; i < threads.length; ++i) { >> >> threads[i] = new Thread(new OperationInsert(cache2, i, 500, >> threadNmu)); >> >> } >> >> for (int i = 0; i < threads.length; ++i) { >> >> threads[i].start(); >> >> } >> >> for (Thread thread : threads) { >> >> thread.join(); >> >> } >> >> >> >> Thread.sleep(60000); >> >> >> >> } >> >> >> >> static class OperationInsert implements Runnable { >> >> >> >> private ClientCache<Object, Object> cache; >> >> private int k; >> >> private Integer test_rows; >> >> private Integer thread_cnt; >> >> >> >> public OperationInsert(ClientCache<Object, Object> cache, int k, >> Integer test_rows, Integer thread_cnt) { >> >> this.cache = cache; >> >> this.k = k; >> >> this.test_rows = test_rows; >> >> this.thread_cnt = thread_cnt; >> >> } >> >> >> >> @Override >> >> public void run() { >> >> for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + >> (test_rows/thread_cnt) * (k + 1); i++) { >> >> cache.put("" + i, "aaa"); >> >> } >> >> } >> >> } >> >> >> >> } >> >> >> >> Version: >> >> The testing program uses Ignite version 2.15.0 >> >> I attempted to insert data using one thread and did not observe any event >> loss. In addition, I also attempted an Ignite cluster with two or three >> nodes, which can still listen to all 500 events even when inserting data >> using multiple threads. May I ask if this issue only occurs at a single >> node? Are there any good solutions? >> >>