How many QueueConsumers do you have? In the pseudo code below, you only show 1 
consumer and it will wait until receiver.processFile() completes before it 
takes any more messages.

-Jordan

> On Nov 16, 2016, at 4:18 PM, Vadim <[email protected]> wrote:
> 
> Dear Jordan,
> 
> Technical details are briefly:
> 
> Application is build with Spring Boot and Zookeeper 3.4.9
> Curator is used to elect leader (LeaderLatch) and to provide distributed 
> queue that is lock-aware. Here is queue initialization code:
> 
>     private void distributedQueueInit() throws Exception {
>         FileConsumer consumer = new FileConsumer(receiver);
>         FileReferenceSerializer serializer = new FileReferenceSerializer();
>         QueueBuilder<FileReference> builder = QueueBuilder.builder(client, 
> consumer, serializer, queuePath);
>         builder.lockPath(lockPath);
>         fileRefs = builder.buildQueue();
>         fileRefs.start();    
>     }
> FileConsumer implements QueueConsumer and its processMessage() is very short:
>     
>    @Override
>     public void consumeMessage(FileReference reference) throws Exception {
>         log.info("Processing file: " + reference.getReference() + "|" + 
> reference.getRefType());
>         receiver.processFile(reference);
>     }
> The problem is at receiver.processFile(reference) function.  It has HTTP call 
> to external resource that retrieves file by reference and save it on disk. 
> When problem arise (it happened once for 3 days) I can see at my logs that 
> "Processing file:" is fired. Next log message is after file is saved to disk. 
> When error happened - it did not appear. At the same time I can see lock and 
> message in the queue that are not processed. I thing there are only 2 reasons 
> for consumer to die :
> HTTP call takes too much time and consumer dies
> HTTP connection dies without exception and consumer does not receive any 
> error and hangs on
>    If you need more details I can provide information about HTTP call, but I 
> think for such kind of problem it is not relevant. I am curious about what to 
> do next because there are may be more than 2 reasons for such a behavior. 
> What do you think?
> 
> Regards,
> 
> Vadim
> 
>  
>  
> On 2016-11-16 17:31, Jordan Zimmerman wrote:
> 
>> Can you send a code snippet or test that shows the issue?
>>  
>> -Jordan
>> 
>> On Nov 16, 2016, at 4:35 AM, Vadim <[email protected] <mailto:[email protected]>> 
>> wrote:
>> 
>> Hello all,
>> 
>>         My name is Vadim and I am new curator user. I am very happy with 
>> curator, but think that may be using it in a wrong way a bit. Particularly  
>> I am using Distributed Queue receipt and code runs well until Consumer 
>> silently dies. Messages for queue are small and does not exceed 50 bytes. I 
>> use queue to distribute tasks between Workers.
>> 
>>         My QueueConsumer method consumeMessate() calls part of the code that 
>> may fail silently (without exception). Since message delivery at my scenario 
>> is "durable" I can see locks that are never freed. Curator does not "cure" 
>> such a stale consumer as well. Am I doing something wrong?
>> 
>>         I have an idea to call code inside "consumeMessage"  at separate 
>> blocking thread that has a timeout. Thus when my code fails silently -- 
>> consumer will get timeout exception. But this is not an elegant solution I 
>> think. What do you think?
>> 
>> Thank you in advance,
>> 
>> Vadim.
>> 
>>  

Reply via email to