Hello,

Looks like the bug is related to server-side continuous queries
functionality. I've tried to reproduce it using thick-client and got
the same results, in rare cases events are lost.

чт, 25 мая 2023 г. в 06:48, Pavel Tupitsyn <ptupit...@apache.org>:
>
> Thank you for the bug report, I will have a look.
>
> On Thu, May 25, 2023 at 5:10 AM LonesomeRain <lonesomer...@163.com> wrote:
>>
>> Hi, Jeremy McMillan
>>
>>
>>
>> I have created a Jira about this bug.
>>
>>
>>
>> https://issues.apache.org/jira/browse/IGNITE-19561
>>
>>
>>
>> By executing these two test codes in the description, it is easy to 
>> reproduce the problem.
>>
>>
>>
>> What should I do next? Continuously following this Jira?
>>
>>
>>
>>
>>
>> 发件人: Jeremy McMillan
>> 发送时间: 2023年5月24日 23:24
>> 收件人: user@ignite.apache.org
>> 主题: Re: Ignite thin client continuous query listener cannot listen to all 
>> events
>>
>>
>>
>> Thanks for bringing this up!
>>
>>
>>
>> https://ignite.apache.org/docs/latest/key-value-api/continuous-queries#events-delivery-guarantees
>>
>>
>>
>> This sounds like you may have found a bug, but the details you've provided 
>> are not sufficient to help others recreate and observe it for themselves, 
>> and this effort needs to be recorded in a ticket. Would you be able to sign 
>> up for a Jira account and detail steps to reproduce this behavior?
>>
>> You may also want to research this:
>>   https://issues.apache.org/jira/browse/IGNITE-8035
>>
>>
>>
>> On Mon, May 22, 2023 at 6:52 AM lonesomerain <lonesomer...@163.com> wrote:
>>
>> Hi,
>>
>> I have a question while using ignite 2.15.0
>>
>>
>>
>> Problem scenario:
>>
>> Start the Ignite server of one node, start one thin client and create a 
>> continuous query listener, and then use 50 threads to add 500 data to the 
>> cache concurrently.
>>
>> Problem phenomenon:
>>
>> Through the information printed on the listener, it was found that the 
>> number of events listened to each time varies, possibly 496, 499 or 500...
>>
>> Test Code:
>>
>> public class StartServer {
>>
>>     public static void main(String[] args) {
>>
>>         Ignite ignite = Ignition.start();
>>
>>     }
>>
>> }
>>
>>
>>
>> public class StartThinClient {
>>
>>     public static void main(String[] args) throws InterruptedException {
>>
>>         String addr = "127.0.0.1:10800";
>>
>>
>>
>>         int threadNmu = 50;
>>
>>
>>
>>         ClientConfiguration clientConfiguration = new ClientConfiguration();
>>
>>         clientConfiguration.setAddresses(addr);
>>
>>
>>
>>         IgniteClient client1 = Ignition.startClient(clientConfiguration);
>>
>>
>>
>>         ClientCache<Object, Object> cache1 = 
>> client1.getOrCreateCache("test");
>>
>>
>>
>>         ContinuousQuery<Object, Object> query = new ContinuousQuery<>();
>>
>>         query.setLocalListener(new CacheEntryUpdatedListener<Object, 
>> Object>() {
>>
>>             @Override
>>
>>             public void onUpdated(Iterable<CacheEntryEvent<?, ?>> 
>> cacheEntryEvents) throws CacheEntryListenerException {
>>
>>                 Iterator<CacheEntryEvent<?, ?>> iterator = 
>> cacheEntryEvents.iterator();
>>
>>                 while (iterator.hasNext()) {
>>
>>                     CacheEntryEvent<?, ?> next = iterator.next();
>>
>>                     System.out.println("----" + next.getKey());
>>
>>                 }
>>
>>             }
>>
>>         });
>>
>>
>>
>>         cache1.query(query);
>>
>>
>>
>>         IgniteClient client2 = Ignition.startClient(clientConfiguration);
>>
>>         ClientCache<Object, Object> cache2 = client2.cache("test");
>>
>>
>>
>>         Thread[] threads = new Thread[threadNmu];
>>
>>         for (int i = 0; i < threads.length; ++i) {
>>
>>             threads[i] = new Thread(new OperationInsert(cache2, i, 500, 
>> threadNmu));
>>
>>         }
>>
>>         for (int i = 0; i < threads.length; ++i) {
>>
>>             threads[i].start();
>>
>>         }
>>
>>         for (Thread thread : threads) {
>>
>>             thread.join();
>>
>>         }
>>
>>
>>
>>         Thread.sleep(60000);
>>
>>
>>
>>     }
>>
>>
>>
>>     static class OperationInsert implements Runnable {
>>
>>
>>
>>         private ClientCache<Object, Object> cache;
>>
>>         private int k;
>>
>>         private Integer test_rows;
>>
>>         private Integer thread_cnt;
>>
>>
>>
>>         public OperationInsert(ClientCache<Object, Object> cache, int k, 
>> Integer test_rows, Integer thread_cnt) {
>>
>>             this.cache = cache;
>>
>>             this.k = k;
>>
>>             this.test_rows = test_rows;
>>
>>             this.thread_cnt = thread_cnt;
>>
>>         }
>>
>>
>>
>>         @Override
>>
>>         public void run() {
>>
>>             for (int i = 1000000 + (test_rows/thread_cnt) * k; i < 1000000 + 
>> (test_rows/thread_cnt) * (k + 1); i++) {
>>
>>                 cache.put("" + i, "aaa");
>>
>>             }
>>
>>         }
>>
>>     }
>>
>>
>>
>> }
>>
>>
>>
>> Version:
>>
>> The testing program uses Ignite version 2.15.0
>>
>> I attempted to insert data using one thread and did not observe any event 
>> loss. In addition, I also attempted an Ignite cluster with two or three 
>> nodes, which can still listen to all 500 events even when inserting data 
>> using multiple threads. May I ask if this issue only occurs at a single 
>> node? Are there any good solutions?
>>
>>

Reply via email to