Thanks, for the response. I now understand it and there were couple things 
going on.
I reverse engineered it from the simple logger sink and was able to receive 
events from the channel. Although occasionally some events seem to get dropped. 
I will work on it.

From: Hari Shreedharan 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Sunday, March 10, 2013 11:06 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: Custom Sink

HI Vivek,

I cannot be sure of why that is happening. Channel.take() gets called even if 
there are no events in the channel. If the take() method returns null, then 
there are no events in the channel. You can use the Status.BACKOFF return value 
to tell the sink poller to not retry immediately. But eventually, the 
SinkRunner will poll the sink again. This is because the SinkRunner does not 
know the state of the channel, so by calling the process method, the sink can 
take events if they arrive. Generally, the sinks call Channel.take() and if an 
entire batch was non-empty it will return Status.READY, else (that is the batch 
is null), then return Status.BACKOFF. See the code from AvroSink as an example 
(I have taken out some error-handling and counter-handling stuff and added some 
comments):

      transaction.begin();
      for (int i = 0; i < client.getBatchSize(); i++) {
        Event event = channel.take(); //Take an event from the channel

        if (event == null) { //Channel returned null, did not have any more 
events.
          break;
        }
        batch.add(event);
      }

      int size = batch.size();
      int batchSize = client.getBatchSize();

      if (size == 0) {           //The batch was empty, so backoff and try 
again later.
        status = Status.BACKOFF;
      } else {                   //Batch was not empty, don't backoff, try 
immediately after
        client.appendBatch(batch);
      }
      transaction.commit();
      transaction.close();

      return status;

I hope this helps. Another thing you could do is to take something like 
AvroSink/AbstractRpcSink and rip out all of the Avro/Rpc stuff and insert your 
logic into it without changing much of the channel/transaction stuff.

Hope this helps.


Hari


On Sun, Mar 10, 2013 at 8:52 PM, Vikram Kulkarni 
<[email protected]<mailto:[email protected]>> wrote:

I am trying to write a custom sink for flume-ng. I looked at the existing sinks 
and documentation and coded it up. However, the 'process()' method that's 
supposed to receive the events always ends up with a null event. I am doing 
Event event = channel.take(); but the event is null. I see in the logs that 
this method is called repeatedly as the event is still in the channel so I 
think it is reaching the sink but unable to take it out of the channel.

Can someone point me in the right direction?

Thanks,

Vikram


Reply via email to