The timeout is up to you. Your consumeMessage() call should have its own timeout.
-Jordan > On Nov 18, 2016, at 1:50 AM, Vadim <[email protected]> wrote: > > I have 3-node cluster and there is 1 application instance running on each > node. Thus there are 3 consumers in total, but each virtual server has 1 > consumer. The requirement is to receive message reliably. Thus if one > host(consumer) fails -- another host (consumer) should pick up the task and > execute it. > > Failed consumer "holds" the task as you described. You understood the code > correctly. Is it possible to specify timeout for this action? > > > Vadim > > > On 2016-11-17 18:50, Jordan Zimmerman wrote: > >> How many QueueConsumers do you have? In the pseudo code below, you only show >> 1 consumer and it will wait until receiver.processFile() completes before it >> takes any more messages. >> >> -Jordan >> >> On Nov 16, 2016, at 4:18 PM, Vadim <[email protected] <mailto:[email protected]>> >> wrote: >> >> Dear Jordan, >> >> Technical details are briefly: >> >> Application is build with Spring Boot and Zookeeper 3.4.9 >> Curator is used to elect leader (LeaderLatch) and to provide distributed >> queue that is lock-aware. Here is queue initialization code: >> >> private void distributedQueueInit() throws Exception { >> FileConsumer consumer = new FileConsumer(receiver); >> FileReferenceSerializer serializer = new FileReferenceSerializer(); >> QueueBuilder<FileReference> builder = QueueBuilder.builder(client, >> consumer, serializer, queuePath); >> builder.lockPath(lockPath); >> fileRefs = builder.buildQueue(); >> fileRefs.start(); >> } >> FileConsumer implements QueueConsumer and its processMessage() is very short: >> >> @Override >> public void consumeMessage(FileReference reference) throws Exception { >> log.info <http://log.info/>("Processing file: " + >> reference.getReference() + "|" + reference.getRefType()); >> receiver.processFile(reference); >> } >> The problem is at receiver.processFile(reference) function. It has HTTP >> call to external resource that retrieves file by reference and save it on >> disk. When problem arise (it happened once for 3 days) I can see at my logs >> that "Processing file:" is fired. Next log message is after file is saved to >> disk. When error happened - it did not appear. At the same time I can see >> lock and message in the queue that are not processed. I thing there are only >> 2 reasons for consumer to die : >> HTTP call takes too much time and consumer dies >> HTTP connection dies without exception and consumer does not receive any >> error and hangs on >> If you need more details I can provide information about HTTP call, but I >> think for such kind of problem it is not relevant. I am curious about what >> to do next because there are may be more than 2 reasons for such a behavior. >> What do you think? >> >> Regards, >> >> Vadim >> >> >> >> On 2016-11-16 17:31, Jordan Zimmerman wrote: >> >> Can you send a code snippet or test that shows the issue? >> >> -Jordan >> >> On Nov 16, 2016, at 4:35 AM, Vadim <[email protected] <mailto:[email protected]>> >> wrote: >> >> Hello all, >> >> My name is Vadim and I am new curator user. I am very happy with >> curator, but think that may be using it in a wrong way a bit. Particularly >> I am using Distributed Queue receipt and code runs well until Consumer >> silently dies. Messages for queue are small and does not exceed 50 bytes. I >> use queue to distribute tasks between Workers. >> >> My QueueConsumer method consumeMessate() calls part of the code that >> may fail silently (without exception). Since message delivery at my scenario >> is "durable" I can see locks that are never freed. Curator does not "cure" >> such a stale consumer as well. Am I doing something wrong? >> >> I have an idea to call code inside "consumeMessage" at separate >> blocking thread that has a timeout. Thus when my code fails silently -- >> consumer will get timeout exception. But this is not an elegant solution I >> think. What do you think? >> >> Thank you in advance, >> >> Vadim. >> >>
