Thanks for the information. It would be great if you could submit this for 
review. We will review and commit it and get into Flume itself. 

Thanks,
Hari

-- 
Hari Shreedharan


On Thursday, August 9, 2012 at 10:46 PM, khadar basha wrote:

> Hi All,
> 
> Finally i am able to send the logs in log4j format. I have modified the 
> Log4jAppender.java  to include the formatted message into FlumeEvent's body. 
> I verified through programaically.Its working fine. It is Working fine with 
> log4j.xml.  by using log4j.properties it is not taking the format. Not sure 
> anything need to be done.
> 
> Modified Log4jAppender.java
> =====================
> package org.apache.flume.clients.log4jappender;
> 
> import java.nio.charset.Charset;
> import java.util.HashMap;
> import java.util.Map;
> 
> import org.apache.flume.Event;
> import org.apache.flume.EventDeliveryException;
> import org.apache.flume.FlumeException;
> import org.apache.flume.api.RpcClient;
> import org.apache.flume.api.RpcClientFactory;
> import org.apache.flume.event.EventBuilder;
> 
> import org.apache.log4j.AppenderSkeleton;
> import org.apache.log4j.Layout;
> import org.apache.log4j.helpers.LogLog;
> import org.apache.log4j.spi.LoggingEvent;
> 
> /**
>  *
>  * Appends Log4j Events to an external Flume client which is decribed by
>  * the Log4j configuration file. The appender takes two required parameters:
>  *<p>
>  *<strong>Hostname</strong> : This is the hostname of the first hop
>  *at which Flume (through an AvroSource) is listening for events.
>  *</p>
>  *<p>
>  *<strong>Port</strong> : This the port on the above host where the Flume
>  *Source is listening for events.
>  *</p>
>  *A sample log4j properties file which appends to a source would look like:
>  *<pre><p>
>  *log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender
>  *log4j.appender.out2.Port = 25430
>  *log4j.appender.out2.Hostname = foobarflumesource.com 
> (http://foobarflumesource.com)
>  *log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p></pre>
>  *<p><i>Note: Change the last line to the package of the class(es), that will
>  *do the appending.For example if classes from the package
>  *com.bar.foo are appending, the last line would be:</i></p>
>  *<pre><p>log4j.logger.com.bar.foo = DEBUG,out2</p></pre>
>  *
>  *
>  */
> public class Log4jAppender extends AppenderSkeleton {
> 
>   private String hostname;
>   private int port;
>   private RpcClient rpcClient = null;
> 
> 
> 
>   /**
>    * If this constructor is used programmatically rather than from a log4j 
> conf
>    * you must set the <tt>port</tt> and <tt>hostname</tt> and then call
>    * <tt>activateOptions()</tt> before calling <tt>append()</tt>.
>    */
>   public Log4jAppender(){
>  super();
>  System.out.println("Inside Constructor"+layout);
>   }
> 
>   /**
>    * Sets the hostname and port. Even if these are passed the
>    * <tt>activateOptions()</tt> function must be called before calling
>    * <tt>append()</tt>, else <tt>append()</tt> will throw an Exception.
>    * @param hostname The first hop where the client should connect to.
>    * @param port The port to connect on the host.
>    *
>    */
>   public Log4jAppender(String hostname, int port){
>     this.hostname = hostname;
>     this.port = port;
>     System.out.println("Inside Constructor from "+layout);
>   }
>   
>   
>   public Log4jAppender(String hostname, int port, Layout layout){
>    this.hostname = hostname;
>    this.port = port;
>    this.layout = layout;
>  }
> 
>   /**
>    * Append the LoggingEvent, to send to the first Flume hop.
>    * @param event The LoggingEvent to be appended to the flume.
>    * @throws FlumeException if the appender was closed,
>    * or the hostname and port were not setup, there was a timeout, or there
>    * was a connection error.
>    */
>   @Override
>   public synchronized void append(LoggingEvent event) throws FlumeException{
>     //If rpcClient is null, it means either this appender object was never
>     //setup by setting hostname and port and then calling activateOptions
>     //or this appender object was closed by calling close(), so we throw an
>     //exception to show the appender is no longer accessible.
>     if(rpcClient == null){
>       throw new FlumeException("Cannot Append to Appender!" +
>           "Appender either closed or not setup correctly!");
>     }
> 
>     if(!rpcClient.isActive()){
>       reconnect();
>     }
> 
>     //Client created first time append is called.
>     Map<String, String> hdrs = new HashMap<String, String>();
>     hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(), event.getLoggerName());
>     hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
>         String.valueOf(event.getTimeStamp()));
> 
>     //To get the level back simply use
>     //LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
>     //Log4jAvroHeaders.LOG_LEVEL.toString()))
>     hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
>         String.valueOf(event.getLevel().toInt()));
>     hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
> 
>     StringBuilder body = null;
>     if(layout != null){
>     
>     body = new StringBuilder(layout.format(event));
>         
>         if(layout.ignoresThrowable()){    
>           String[] s = event.getThrowableStrRep();
>     
>           if (s != null) {
>     
>             int len = s.length;
>     
>             for (int i = 0; i < len; i++) {
>               body.append(s[i]);
>               body.append('\n');
>             }
>             body.setLength(body.length()-1);
>           }
>           
>         }
>         
>     }
>     
>     if(layout == null) System.out.println("====== Layout is NULL ======");
>     
>     Event flumeEvent = EventBuilder.withBody((layout == null) ? 
> event.getMessage().toString(): body.toString(),
>         Charset.forName("UTF8"), hdrs);
> 
>     try {
>       rpcClient.append(flumeEvent);
>     } catch (EventDeliveryException e) {
>       String msg = "Flume append() failed.";
>       LogLog.error(msg);
>       throw new FlumeException(msg + " Exception follows.", e);
>     }
>   }
> 
>   //This function should be synchronized to make sure one thread
>   //does not close an appender another thread is using, and hence risking
>   //a null pointer exception.
>   /**
>    * Closes underlying client.
>    * If <tt>append()</tt> is called after this function is called,
>    * it will throw an exception.
>    * @throws FlumeException if errors occur during close
>    */
>   @Override
>   public synchronized void close() throws FlumeException{
>     //Any append calls after this will result in an Exception.
>     if (rpcClient != null) {
>       rpcClient.close();
>       rpcClient = null;
>     }
>   }
> 
>   @Override
>   public boolean requiresLayout() {
>     return false;
>   }
> 
>   /**
>    * Set the first flume hop hostname.
>    * @param hostname The first hop where the client should connect to.
>    */
>   public void setHostname(String hostname){
>     this.hostname = hostname;
>   }
> 
>   /**
>    * Set the port on the hostname to connect to.
>    * @param port The port to connect on the host.
>    */
>   public void setPort(int port){
>     this.port = port;
>   }
> 
>   /**
>    * Activate the options set using <tt>setPort()</tt>
>    * and <tt>setHostname()</tt>
>    * @throws FlumeException if the <tt>hostname</tt> and
>    *  <tt>port</tt> combination is invalid.
>    */
>   @Override
>   public void activateOptions() throws FlumeException{
>     try {
>       rpcClient = RpcClientFactory.getDefaultInstance(hostname, port);
>     } catch (FlumeException e) {
>       String errormsg = "RPC client creation failed! " +
>           e.getMessage();
>       LogLog.error(errormsg);
>       throw e;
>     }
>   }
> 
>   /**
>    * Make it easy to reconnect on failure
>    * @throws FlumeException
>    */
>   private void reconnect() throws FlumeException {
>     close();
>     activateOptions();
>   }
> 
> }
> 
> 
> Program to send the log messages: 
> ==========================
>  public void test(){
>     try {
>             Log4jAppender appender = new Log4jAppender();
>                appender.setHostname("flume_agent_host");
>                appender.setPort(41414);
>                appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user} 
> %X{field}> %m"));
>             //   appender.setReconnectAttempts(100);
>                
>               appender.activateOptions();
> 
>                log.addAppender(appender);
> 
>                MDC.put("user", "chris"); 
>              //  while (true) {
>                    MDC.put("field", UUID.randomUUID().toString());
>                    log.info (http://log.info)("=====> Hello World");
>                    try {
>                        throw new Exception("Testing");
>                    } catch (Exception e) {
>                        log.error("Gone wrong ===>", e);
>                    }
>                //}
>                    System.in.read();
>                    System.in.read();
>            }
>            catch (Exception e) {
>                e.printStackTrace();
>            }
>        }
> 
> 
> 
> 
> Thanks,
> Khadar
> 
> 
> 
> On Fri, Jul 27, 2012 at 8:55 PM, Ralph Goers <[email protected] 
> (mailto:[email protected])> wrote:
> > Khadar,
> > 
> > I am not sure if your reply was meant as a response to my comment or just 
> > as additional information. The Log4j 2 FlumeAppender is not part of Flume. 
> > See 
> > http://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAvroAppender.
> >  However, Log4j 2 is still waiting for its first release so you will have 
> > to build it yourself if you want to try it. 
> > 
> > Ralph
> > 
> > On Jul 27, 2012, at 6:58 AM, khadar basha wrote:
> > > FlumeLog4jAvroAppender is available as part of 0.94. But its not 
> > > available in 1.2.Log4jAppender is from flume-ng-log4jappender-1.2.0.jar
> > > 
> > > I think it is replace as part of 1.2.
> > > 
> > > On Fri, Jul 27, 2012 at 6:31 PM, Ralph Goers <[email protected] 
> > > (mailto:[email protected])> wrote:
> > > > You might consider looking at Log4j 2. It has a Flume Appender that 
> > > > records the whole formatted log message in the body. In addition, it 
> > > > will record the MDC fields as well.
> > > > 
> > > > Sent from my iPad 
> > > > 
> > > > On Jul 27, 2012, at 5:49 AM, khadar basha <[email protected] 
> > > > (mailto:[email protected])> wrote:
> > > > 
> > > > > Hi 
> > > > > 
> > > > > I am using Flume1.2. Using avro source and hdfs sink. Sending message 
> > > > > using the Logger channel from application server. For that i am using 
> > > > > the org.apache.flume.clients.log4jappender.Log4jAppender in MyApp. 
> > > > > 
> > > > > But i am getting only body of the message (description). loosing the 
> > > > > time, thread, Level information. 
> > > > > 
> > > > > flume-conf.properties file
> > > > > ==================
> > > > > agent2Test1.sources = seqGenSrc
> > > > > agent2Test1.channels = memoryChannel
> > > > > agent2Test1.sinks = loggerSink
> > > > > 
> > > > > # For each one of the sources, the type is defined
> > > > > agent2Test1.sources.seqGenSrc.type = avro
> > > > > agent2Test1.sources.seqGenSrc.bind=localhost
> > > > > agent2Test1.sources.seqGenSrc.port=41414
> > > > > 
> > > > > # interceptors for host and date
> > > > > agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
> > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type = 
> > > > > org.apache.flume.interceptor.HostInterceptor$Builder
> > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader 
> > > > > = host
> > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = 
> > > > > false
> > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
> > > > >  = false
> > > > > agent2Test1.sources.seqGenSrc.interceptors.time.type = 
> > > > > org.apache.flume.interceptor.TimestampInterceptor$Builder
> > > > > 
> > > > > # The channel can be defined as follows.
> > > > > agent2Test1.sources.seqGenSrc.channels = memoryChannel
> > > > > 
> > > > > # Each sink's type must be defined
> > > > > agent2Test1.sinks.loggerSink.type = hdfs
> > > > > agent2Test1.sinks.loggerSink.hdfs.path = 
> > > > > hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs
> > > > > 
> > > > > agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream
> > > > > 
> > > > > #Specify the channel the sink should use 
> > > > > agent2Test1.sinks.loggerSink.channel = memoryChannel
> > > > > 
> > > > > # Each channel's type is defined.
> > > > > agent2Test1.channels.memoryChannel.type = memory
> > > > > 
> > > > > # Other config values specific to each type of channel(sink or 
> > > > > source) 
> > > > > # can be defined as well
> > > > > # In this case, it specifies the capacity of the memory channel
> > > > > agent2Test1.channels.memoryChannel.capacity = 1000
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > Sample java program to generate the log message:
> > > > > =====================================
> > > > > 
> > > > > 
> > > > > package com.test;
> > > > > 
> > > > > 
> > > > > import org.apache.flume.clients.log4jappender.Log4jAppender; 
> > > > > import org.apache.log4j.Logger;
> > > > > import org.apache.log4j.MDC;
> > > > > import org.apache.log4j.PatternLayout;
> > > > > 
> > > > > import java.util.UUID;
> > > > > 
> > > > > 
> > > > > public class Main { 
> > > > >     static Logger log = Logger.getLogger(Main.class);
> > > > > 
> > > > >     public static void main(String[] args) {
> > > > >         try {
> > > > >          Log4jAppender appender = new Log4jAppender();
> > > > >             appender.setHostname("localhost");
> > > > >             appender.setPort(41414);
> > > > >             appender.setLayout(new PatternLayout("%d [%c] (%t) 
> > > > > <%X{user} %X{field}> %m"));
> > > > >          //   appender.setReconnectAttempts(100);
> > > > >             
> > > > >            appender.activateOptions();
> > > > > 
> > > > >             log.addAppender(appender);
> > > > > 
> > > > >             MDC.put("user", "chris"); 
> > > > >           //  while (true) {
> > > > >                 MDC.put("field", UUID.randomUUID().toString());
> > > > >                 log.info (http://log.info/)("=====> Hello World");
> > > > >                 try {
> > > > >                     throw new Exception("Testing");
> > > > >                 } catch (Exception e) {
> > > > >                     log.error("Gone wrong ===>", e);
> > > > >                 }
> > > > >             //}
> > > > >                 System.in.read();
> > > > >                 System.in.read();
> > > > >         }
> > > > >         catch (Exception e) {
> > > > >             e.printStackTrace();
> > > > >         }
> > > > >     }
> > > > > }
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > I am missing any config here ?
> > > > > 
> > > > > 
> > > > > 
> > > > > -- 
> > > > > Thanks,
> > > > > Khadar
> > > > > 
> > > 
> > > 
> > > 
> > > -- 
> > > Thanks,
> > > Khadar
> > > 
> > 
> 
> 
> 
> -- 
> Thanks,
> Khadar
> 

Reply via email to