Thanks for the information. It would be great if you could submit this for review. We will review and commit it and get into Flume itself.
Thanks, Hari -- Hari Shreedharan On Thursday, August 9, 2012 at 10:46 PM, khadar basha wrote: > Hi All, > > Finally i am able to send the logs in log4j format. I have modified the > Log4jAppender.java to include the formatted message into FlumeEvent's body. > I verified through programaically.Its working fine. It is Working fine with > log4j.xml. by using log4j.properties it is not taking the format. Not sure > anything need to be done. > > Modified Log4jAppender.java > ===================== > package org.apache.flume.clients.log4jappender; > > import java.nio.charset.Charset; > import java.util.HashMap; > import java.util.Map; > > import org.apache.flume.Event; > import org.apache.flume.EventDeliveryException; > import org.apache.flume.FlumeException; > import org.apache.flume.api.RpcClient; > import org.apache.flume.api.RpcClientFactory; > import org.apache.flume.event.EventBuilder; > > import org.apache.log4j.AppenderSkeleton; > import org.apache.log4j.Layout; > import org.apache.log4j.helpers.LogLog; > import org.apache.log4j.spi.LoggingEvent; > > /** > * > * Appends Log4j Events to an external Flume client which is decribed by > * the Log4j configuration file. The appender takes two required parameters: > *<p> > *<strong>Hostname</strong> : This is the hostname of the first hop > *at which Flume (through an AvroSource) is listening for events. > *</p> > *<p> > *<strong>Port</strong> : This the port on the above host where the Flume > *Source is listening for events. > *</p> > *A sample log4j properties file which appends to a source would look like: > *<pre><p> > *log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender > *log4j.appender.out2.Port = 25430 > *log4j.appender.out2.Hostname = foobarflumesource.com > (http://foobarflumesource.com) > *log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p></pre> > *<p><i>Note: Change the last line to the package of the class(es), that will > *do the appending.For example if classes from the package > *com.bar.foo are appending, the last line would be:</i></p> > *<pre><p>log4j.logger.com.bar.foo = DEBUG,out2</p></pre> > * > * > */ > public class Log4jAppender extends AppenderSkeleton { > > private String hostname; > private int port; > private RpcClient rpcClient = null; > > > > /** > * If this constructor is used programmatically rather than from a log4j > conf > * you must set the <tt>port</tt> and <tt>hostname</tt> and then call > * <tt>activateOptions()</tt> before calling <tt>append()</tt>. > */ > public Log4jAppender(){ > super(); > System.out.println("Inside Constructor"+layout); > } > > /** > * Sets the hostname and port. Even if these are passed the > * <tt>activateOptions()</tt> function must be called before calling > * <tt>append()</tt>, else <tt>append()</tt> will throw an Exception. > * @param hostname The first hop where the client should connect to. > * @param port The port to connect on the host. > * > */ > public Log4jAppender(String hostname, int port){ > this.hostname = hostname; > this.port = port; > System.out.println("Inside Constructor from "+layout); > } > > > public Log4jAppender(String hostname, int port, Layout layout){ > this.hostname = hostname; > this.port = port; > this.layout = layout; > } > > /** > * Append the LoggingEvent, to send to the first Flume hop. > * @param event The LoggingEvent to be appended to the flume. > * @throws FlumeException if the appender was closed, > * or the hostname and port were not setup, there was a timeout, or there > * was a connection error. > */ > @Override > public synchronized void append(LoggingEvent event) throws FlumeException{ > //If rpcClient is null, it means either this appender object was never > //setup by setting hostname and port and then calling activateOptions > //or this appender object was closed by calling close(), so we throw an > //exception to show the appender is no longer accessible. > if(rpcClient == null){ > throw new FlumeException("Cannot Append to Appender!" + > "Appender either closed or not setup correctly!"); > } > > if(!rpcClient.isActive()){ > reconnect(); > } > > //Client created first time append is called. > Map<String, String> hdrs = new HashMap<String, String>(); > hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(), event.getLoggerName()); > hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(), > String.valueOf(event.getTimeStamp())); > > //To get the level back simply use > //LoggerEvent.toLevel(hdrs.get(Integer.parseInt( > //Log4jAvroHeaders.LOG_LEVEL.toString())) > hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(), > String.valueOf(event.getLevel().toInt())); > hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8"); > > StringBuilder body = null; > if(layout != null){ > > body = new StringBuilder(layout.format(event)); > > if(layout.ignoresThrowable()){ > String[] s = event.getThrowableStrRep(); > > if (s != null) { > > int len = s.length; > > for (int i = 0; i < len; i++) { > body.append(s[i]); > body.append('\n'); > } > body.setLength(body.length()-1); > } > > } > > } > > if(layout == null) System.out.println("====== Layout is NULL ======"); > > Event flumeEvent = EventBuilder.withBody((layout == null) ? > event.getMessage().toString(): body.toString(), > Charset.forName("UTF8"), hdrs); > > try { > rpcClient.append(flumeEvent); > } catch (EventDeliveryException e) { > String msg = "Flume append() failed."; > LogLog.error(msg); > throw new FlumeException(msg + " Exception follows.", e); > } > } > > //This function should be synchronized to make sure one thread > //does not close an appender another thread is using, and hence risking > //a null pointer exception. > /** > * Closes underlying client. > * If <tt>append()</tt> is called after this function is called, > * it will throw an exception. > * @throws FlumeException if errors occur during close > */ > @Override > public synchronized void close() throws FlumeException{ > //Any append calls after this will result in an Exception. > if (rpcClient != null) { > rpcClient.close(); > rpcClient = null; > } > } > > @Override > public boolean requiresLayout() { > return false; > } > > /** > * Set the first flume hop hostname. > * @param hostname The first hop where the client should connect to. > */ > public void setHostname(String hostname){ > this.hostname = hostname; > } > > /** > * Set the port on the hostname to connect to. > * @param port The port to connect on the host. > */ > public void setPort(int port){ > this.port = port; > } > > /** > * Activate the options set using <tt>setPort()</tt> > * and <tt>setHostname()</tt> > * @throws FlumeException if the <tt>hostname</tt> and > * <tt>port</tt> combination is invalid. > */ > @Override > public void activateOptions() throws FlumeException{ > try { > rpcClient = RpcClientFactory.getDefaultInstance(hostname, port); > } catch (FlumeException e) { > String errormsg = "RPC client creation failed! " + > e.getMessage(); > LogLog.error(errormsg); > throw e; > } > } > > /** > * Make it easy to reconnect on failure > * @throws FlumeException > */ > private void reconnect() throws FlumeException { > close(); > activateOptions(); > } > > } > > > Program to send the log messages: > ========================== > public void test(){ > try { > Log4jAppender appender = new Log4jAppender(); > appender.setHostname("flume_agent_host"); > appender.setPort(41414); > appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user} > %X{field}> %m")); > // appender.setReconnectAttempts(100); > > appender.activateOptions(); > > log.addAppender(appender); > > MDC.put("user", "chris"); > // while (true) { > MDC.put("field", UUID.randomUUID().toString()); > log.info (http://log.info)("=====> Hello World"); > try { > throw new Exception("Testing"); > } catch (Exception e) { > log.error("Gone wrong ===>", e); > } > //} > System.in.read(); > System.in.read(); > } > catch (Exception e) { > e.printStackTrace(); > } > } > > > > > Thanks, > Khadar > > > > On Fri, Jul 27, 2012 at 8:55 PM, Ralph Goers <[email protected] > (mailto:[email protected])> wrote: > > Khadar, > > > > I am not sure if your reply was meant as a response to my comment or just > > as additional information. The Log4j 2 FlumeAppender is not part of Flume. > > See > > http://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAvroAppender. > > However, Log4j 2 is still waiting for its first release so you will have > > to build it yourself if you want to try it. > > > > Ralph > > > > On Jul 27, 2012, at 6:58 AM, khadar basha wrote: > > > FlumeLog4jAvroAppender is available as part of 0.94. But its not > > > available in 1.2.Log4jAppender is from flume-ng-log4jappender-1.2.0.jar > > > > > > I think it is replace as part of 1.2. > > > > > > On Fri, Jul 27, 2012 at 6:31 PM, Ralph Goers <[email protected] > > > (mailto:[email protected])> wrote: > > > > You might consider looking at Log4j 2. It has a Flume Appender that > > > > records the whole formatted log message in the body. In addition, it > > > > will record the MDC fields as well. > > > > > > > > Sent from my iPad > > > > > > > > On Jul 27, 2012, at 5:49 AM, khadar basha <[email protected] > > > > (mailto:[email protected])> wrote: > > > > > > > > > Hi > > > > > > > > > > I am using Flume1.2. Using avro source and hdfs sink. Sending message > > > > > using the Logger channel from application server. For that i am using > > > > > the org.apache.flume.clients.log4jappender.Log4jAppender in MyApp. > > > > > > > > > > But i am getting only body of the message (description). loosing the > > > > > time, thread, Level information. > > > > > > > > > > flume-conf.properties file > > > > > ================== > > > > > agent2Test1.sources = seqGenSrc > > > > > agent2Test1.channels = memoryChannel > > > > > agent2Test1.sinks = loggerSink > > > > > > > > > > # For each one of the sources, the type is defined > > > > > agent2Test1.sources.seqGenSrc.type = avro > > > > > agent2Test1.sources.seqGenSrc.bind=localhost > > > > > agent2Test1.sources.seqGenSrc.port=41414 > > > > > > > > > > # interceptors for host and date > > > > > agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor > > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type = > > > > > org.apache.flume.interceptor.HostInterceptor$Builder > > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader > > > > > = host > > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = > > > > > false > > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting > > > > > = false > > > > > agent2Test1.sources.seqGenSrc.interceptors.time.type = > > > > > org.apache.flume.interceptor.TimestampInterceptor$Builder > > > > > > > > > > # The channel can be defined as follows. > > > > > agent2Test1.sources.seqGenSrc.channels = memoryChannel > > > > > > > > > > # Each sink's type must be defined > > > > > agent2Test1.sinks.loggerSink.type = hdfs > > > > > agent2Test1.sinks.loggerSink.hdfs.path = > > > > > hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs > > > > > > > > > > agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream > > > > > > > > > > #Specify the channel the sink should use > > > > > agent2Test1.sinks.loggerSink.channel = memoryChannel > > > > > > > > > > # Each channel's type is defined. > > > > > agent2Test1.channels.memoryChannel.type = memory > > > > > > > > > > # Other config values specific to each type of channel(sink or > > > > > source) > > > > > # can be defined as well > > > > > # In this case, it specifies the capacity of the memory channel > > > > > agent2Test1.channels.memoryChannel.capacity = 1000 > > > > > > > > > > > > > > > > > > > > > > > > > Sample java program to generate the log message: > > > > > ===================================== > > > > > > > > > > > > > > > package com.test; > > > > > > > > > > > > > > > import org.apache.flume.clients.log4jappender.Log4jAppender; > > > > > import org.apache.log4j.Logger; > > > > > import org.apache.log4j.MDC; > > > > > import org.apache.log4j.PatternLayout; > > > > > > > > > > import java.util.UUID; > > > > > > > > > > > > > > > public class Main { > > > > > static Logger log = Logger.getLogger(Main.class); > > > > > > > > > > public static void main(String[] args) { > > > > > try { > > > > > Log4jAppender appender = new Log4jAppender(); > > > > > appender.setHostname("localhost"); > > > > > appender.setPort(41414); > > > > > appender.setLayout(new PatternLayout("%d [%c] (%t) > > > > > <%X{user} %X{field}> %m")); > > > > > // appender.setReconnectAttempts(100); > > > > > > > > > > appender.activateOptions(); > > > > > > > > > > log.addAppender(appender); > > > > > > > > > > MDC.put("user", "chris"); > > > > > // while (true) { > > > > > MDC.put("field", UUID.randomUUID().toString()); > > > > > log.info (http://log.info/)("=====> Hello World"); > > > > > try { > > > > > throw new Exception("Testing"); > > > > > } catch (Exception e) { > > > > > log.error("Gone wrong ===>", e); > > > > > } > > > > > //} > > > > > System.in.read(); > > > > > System.in.read(); > > > > > } > > > > > catch (Exception e) { > > > > > e.printStackTrace(); > > > > > } > > > > > } > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > I am missing any config here ? > > > > > > > > > > > > > > > > > > > > -- > > > > > Thanks, > > > > > Khadar > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Khadar > > > > > > > > > -- > Thanks, > Khadar >
