How many QueueConsumers do you have? In the pseudo code below, you only show 1 consumer and it will wait until receiver.processFile() completes before it takes any more messages.
-Jordan > On Nov 16, 2016, at 4:18 PM, Vadim <[email protected]> wrote: > > Dear Jordan, > > Technical details are briefly: > > Application is build with Spring Boot and Zookeeper 3.4.9 > Curator is used to elect leader (LeaderLatch) and to provide distributed > queue that is lock-aware. Here is queue initialization code: > > private void distributedQueueInit() throws Exception { > FileConsumer consumer = new FileConsumer(receiver); > FileReferenceSerializer serializer = new FileReferenceSerializer(); > QueueBuilder<FileReference> builder = QueueBuilder.builder(client, > consumer, serializer, queuePath); > builder.lockPath(lockPath); > fileRefs = builder.buildQueue(); > fileRefs.start(); > } > FileConsumer implements QueueConsumer and its processMessage() is very short: > > @Override > public void consumeMessage(FileReference reference) throws Exception { > log.info("Processing file: " + reference.getReference() + "|" + > reference.getRefType()); > receiver.processFile(reference); > } > The problem is at receiver.processFile(reference) function. It has HTTP call > to external resource that retrieves file by reference and save it on disk. > When problem arise (it happened once for 3 days) I can see at my logs that > "Processing file:" is fired. Next log message is after file is saved to disk. > When error happened - it did not appear. At the same time I can see lock and > message in the queue that are not processed. I thing there are only 2 reasons > for consumer to die : > HTTP call takes too much time and consumer dies > HTTP connection dies without exception and consumer does not receive any > error and hangs on > If you need more details I can provide information about HTTP call, but I > think for such kind of problem it is not relevant. I am curious about what to > do next because there are may be more than 2 reasons for such a behavior. > What do you think? > > Regards, > > Vadim > > > > On 2016-11-16 17:31, Jordan Zimmerman wrote: > >> Can you send a code snippet or test that shows the issue? >> >> -Jordan >> >> On Nov 16, 2016, at 4:35 AM, Vadim <[email protected] <mailto:[email protected]>> >> wrote: >> >> Hello all, >> >> My name is Vadim and I am new curator user. I am very happy with >> curator, but think that may be using it in a wrong way a bit. Particularly >> I am using Distributed Queue receipt and code runs well until Consumer >> silently dies. Messages for queue are small and does not exceed 50 bytes. I >> use queue to distribute tasks between Workers. >> >> My QueueConsumer method consumeMessate() calls part of the code that >> may fail silently (without exception). Since message delivery at my scenario >> is "durable" I can see locks that are never freed. Curator does not "cure" >> such a stale consumer as well. Am I doing something wrong? >> >> I have an idea to call code inside "consumeMessage" at separate >> blocking thread that has a timeout. Thus when my code fails silently -- >> consumer will get timeout exception. But this is not an elegant solution I >> think. What do you think? >> >> Thank you in advance, >> >> Vadim. >> >>
