I wrote a custom TCP Sink recently and would be happy to share the code; it's based off the Logger sink (Brock Noland gave me a helpful link to the source: https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java). My code is quite simple, pretty bare-bones at this point. I'm planning to make it more full-featured and robust, and will eventually put it out on GitHub. Msg me in the interim.
Erik On Mon, Mar 11, 2013 at 2:06 AM, Hari Shreedharan <[email protected] > wrote: > HI Vivek, > > I cannot be sure of why that is happening. Channel.take() gets called even > if there are no events in the channel. If the take() method returns null, > then there are no events in the channel. You can use the Status.BACKOFF > return value to tell the sink poller to not retry immediately. But > eventually, the SinkRunner will poll the sink again. This is because the > SinkRunner does not know the state of the channel, so by calling the > process method, the sink can take events if they arrive. Generally, the > sinks call Channel.take() and if an entire batch was non-empty it will > return Status.READY, else (that is the batch is null), then return > Status.BACKOFF. See the code from AvroSink as an example (I have taken out > some error-handling and counter-handling stuff and added some comments): > > transaction.begin(); > for (int i = 0; i < client.getBatchSize(); i++) { > Event event = channel.take(); //Take an event from the channel > > if (event == null) { //Channel returned null, did not have any > more events. > break; > } > batch.add(event); > } > > int size = batch.size(); > int batchSize = client.getBatchSize(); > > if (size == 0) { //The batch was empty, so backoff and try > again later. > status = Status.BACKOFF; > } else { //Batch was not empty, don't backoff, try > immediately after > client.appendBatch(batch); > } > transaction.commit(); > transaction.close(); > > return status; > > I hope this helps. Another thing you could do is to take something like > AvroSink/AbstractRpcSink and rip out all of the Avro/Rpc stuff and insert > your logic into it without changing much of the channel/transaction stuff. > > Hope this helps. > > > Hari > > > On Sun, Mar 10, 2013 at 8:52 PM, Vikram Kulkarni > <[email protected]>wrote: > >> I am trying to write a custom sink for flume-ng. I looked at the existing >> sinks and documentation and coded it up. However, the 'process()' method >> that's supposed to receive the events always ends up with a null event. I >> am doing Event event = channel.take(); but the event is null. I see in the >> logs that this method is called repeatedly as the event is still in the >> channel so I think it is reaching the sink but unable to take it out of the >> channel. >> >> Can someone point me in the right direction? >> >> Thanks, >> >> Vikram >> >> >> >
