I have 3-node cluster and there is 1 application instance running on each node. Thus there are 3 consumers in total, but each virtual server has 1 consumer. The requirement is to receive message reliably. Thus if one host(consumer) fails -- another host (consumer) should pick up the task and execute it.
Failed consumer "holds" the task as you described. You understood the code correctly. Is it possible to specify timeout for this action? Vadim On 2016-11-17 18:50, Jordan Zimmerman wrote: > How many QueueConsumers do you have? In the pseudo code below, you only show > 1 consumer and it will wait until receiver.processFile() completes before it > takes any more messages. > > -Jordan > > On Nov 16, 2016, at 4:18 PM, Vadim <[email protected]> wrote: > > Dear Jordan, > > Technical details are briefly: > > * Application is build with Spring Boot and Zookeeper 3.4.9 > > * Curator is used to elect leader (LeaderLatch) and to provide distributed > queue that is lock-aware. Here is queue initialization code: > > private void distributedQueueInit() throws Exception { > FileConsumer consumer = new FileConsumer(receiver); > FileReferenceSerializer serializer = new FileReferenceSerializer(); > QueueBuilder<FileReference> builder = QueueBuilder.builder(client, consumer, > serializer, queuePath); > builder.lockPath(lockPath); > fileRefs = builder.buildQueue(); > fileRefs.start(); > } > * FileConsumer implements QueueConsumer and its processMessage() is very > short: > > @Override > public void consumeMessage(FileReference reference) throws Exception { > log.info [1]("Processing file: " + reference.getReference() + "|" + > reference.getRefType()); > receiver.processFile(reference); > } > * The problem is at receiver.processFile(reference) function. It has HTTP > call to external resource that retrieves file by reference and save it on > disk. When problem arise (it happened once for 3 days) I can see at my logs > that "Processing file:" is fired. Next log message is after file is saved to > disk. When error happened - it did not appear. At the same time I can see > lock and message in the queue that are not processed. I thing there are only > 2 reasons for consumer to die : > > * HTTP call takes too much time and consumer dies > * HTTP connection dies without exception and consumer does not receive any > error and hangs on > > If you need more details I can provide information about HTTP call, but I > think for such kind of problem it is not relevant. I am curious about what to > do next because there are may be more than 2 reasons for such a behavior. > What do you think? > > Regards, > > Vadim > > On 2016-11-16 17:31, Jordan Zimmerman wrote: Can you send a code snippet or > test that shows the issue? > > -Jordan > > On Nov 16, 2016, at 4:35 AM, Vadim <[email protected]> wrote: > > Hello all, > > My name is Vadim and I am new curator user. I am very happy with curator, but > think that may be using it in a wrong way a bit. Particularly I am using > Distributed Queue receipt and code runs well until Consumer silently dies. > Messages for queue are small and does not exceed 50 bytes. I use queue to > distribute tasks between Workers. > > My QueueConsumer method consumeMessate() calls part of the code that may fail > silently (without exception). Since message delivery at my scenario is > "durable" I can see locks that are never freed. Curator does not "cure" such > a stale consumer as well. Am I doing something wrong? > > I have an idea to call code inside "consumeMessage" at separate blocking > thread that has a timeout. Thus when my code fails silently -- consumer will > get timeout exception. But this is not an elegant solution I think. What do > you think? > > Thank you in advance, > > Vadim. Links: ------ [1] http://log.info
