Hi All,
Finally i am able to send the logs in log4j format. I have modified the
Log4jAppender.java to include the formatted message into FlumeEvent's
body.
I verified through programaically.Its working fine. It is Working fine with
log4j.xml. by using log4j.properties it is not taking the format. Not sure
anything need to be done.
Modified Log4jAppender.java
=====================
package org.apache.flume.clients.log4jappender;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
/**
*
* Appends Log4j Events to an external Flume client which is decribed by
* the Log4j configuration file. The appender takes two required parameters:
*<p>
*<strong>Hostname</strong> : This is the hostname of the first hop
*at which Flume (through an AvroSource) is listening for events.
*</p>
*<p>
*<strong>Port</strong> : This the port on the above host where the Flume
*Source is listening for events.
*</p>
*A sample log4j properties file which appends to a source would look like:
*<pre><p>
*log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender
*log4j.appender.out2.Port = 25430
*log4j.appender.out2.Hostname = foobarflumesource.com
*log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p></pre>
*<p><i>Note: Change the last line to the package of the class(es), that
will
*do the appending.For example if classes from the package
*com.bar.foo are appending, the last line would be:</i></p>
*<pre><p>log4j.logger.com.bar.foo = DEBUG,out2</p></pre>
*
*
*/
public class Log4jAppender extends AppenderSkeleton {
private String hostname;
private int port;
private RpcClient rpcClient = null;
/**
* If this constructor is used programmatically rather than from a log4j
conf
* you must set the <tt>port</tt> and <tt>hostname</tt> and then call
* <tt>activateOptions()</tt> before calling <tt>append()</tt>.
*/
public Log4jAppender(){
super();
System.out.println("Inside Constructor"+layout);
}
/**
* Sets the hostname and port. Even if these are passed the
* <tt>activateOptions()</tt> function must be called before calling
* <tt>append()</tt>, else <tt>append()</tt> will throw an Exception.
* @param hostname The first hop where the client should connect to.
* @param port The port to connect on the host.
*
*/
public Log4jAppender(String hostname, int port){
this.hostname = hostname;
this.port = port;
System.out.println("Inside Constructor from "+layout);
}
public Log4jAppender(String hostname, int port, Layout layout){
this.hostname = hostname;
this.port = port;
this.layout = layout;
}
/**
* Append the LoggingEvent, to send to the first Flume hop.
* @param event The LoggingEvent to be appended to the flume.
* @throws FlumeException if the appender was closed,
* or the hostname and port were not setup, there was a timeout, or there
* was a connection error.
*/
@Override
public synchronized void append(LoggingEvent event) throws FlumeException{
//If rpcClient is null, it means either this appender object was never
//setup by setting hostname and port and then calling activateOptions
//or this appender object was closed by calling close(), so we throw an
//exception to show the appender is no longer accessible.
if(rpcClient == null){
throw new FlumeException("Cannot Append to Appender!" +
"Appender either closed or not setup correctly!");
}
if(!rpcClient.isActive()){
reconnect();
}
//Client created first time append is called.
Map<String, String> hdrs = new HashMap<String, String>();
hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(),
event.getLoggerName());
hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
String.valueOf(event.getTimeStamp()));
//To get the level back simply use
//LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
//Log4jAvroHeaders.LOG_LEVEL.toString()))
hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
String.valueOf(event.getLevel().toInt()));
hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
StringBuilder body = null;
if(layout != null){
body = new StringBuilder(layout.format(event));
if(layout.ignoresThrowable()){
String[] s = event.getThrowableStrRep();
if (s != null) {
int len = s.length;
for (int i = 0; i < len; i++) {
body.append(s[i]);
body.append('\n');
}
body.setLength(body.length()-1);
}
}
}
if(layout == null) System.out.println("====== Layout is NULL ======");
Event flumeEvent = EventBuilder.withBody((layout == null) ?
event.getMessage().toString(): body.toString(),
Charset.forName("UTF8"), hdrs);
try {
rpcClient.append(flumeEvent);
} catch (EventDeliveryException e) {
String msg = "Flume append() failed.";
LogLog.error(msg);
throw new FlumeException(msg + " Exception follows.", e);
}
}
//This function should be synchronized to make sure one thread
//does not close an appender another thread is using, and hence risking
//a null pointer exception.
/**
* Closes underlying client.
* If <tt>append()</tt> is called after this function is called,
* it will throw an exception.
* @throws FlumeException if errors occur during close
*/
@Override
public synchronized void close() throws FlumeException{
//Any append calls after this will result in an Exception.
if (rpcClient != null) {
rpcClient.close();
rpcClient = null;
}
}
@Override
public boolean requiresLayout() {
return false;
}
/**
* Set the first flume hop hostname.
* @param hostname The first hop where the client should connect to.
*/
public void setHostname(String hostname){
this.hostname = hostname;
}
/**
* Set the port on the hostname to connect to.
* @param port The port to connect on the host.
*/
public void setPort(int port){
this.port = port;
}
/**
* Activate the options set using <tt>setPort()</tt>
* and <tt>setHostname()</tt>
* @throws FlumeException if the <tt>hostname</tt> and
* <tt>port</tt> combination is invalid.
*/
@Override
public void activateOptions() throws FlumeException{
try {
rpcClient = RpcClientFactory.getDefaultInstance(hostname, port);
} catch (FlumeException e) {
String errormsg = "RPC client creation failed! " +
e.getMessage();
LogLog.error(errormsg);
throw e;
}
}
/**
* Make it easy to reconnect on failure
* @throws FlumeException
*/
private void reconnect() throws FlumeException {
close();
activateOptions();
}
}
Program to send the log messages:
==========================
public void test(){
try {
Log4jAppender appender = new Log4jAppender();
appender.setHostname("flume_agent_host");
appender.setPort(41414);
appender.setLayout(new PatternLayout("%d [%c] (%t)
<%X{user} %X{field}> %m"));
// appender.setReconnectAttempts(100);
appender.activateOptions();
log.addAppender(appender);
MDC.put("user", "chris");
// while (true) {
MDC.put("field", UUID.randomUUID().toString());
log.info("=====> Hello World");
try {
throw new Exception("Testing");
} catch (Exception e) {
log.error("Gone wrong ===>", e);
}
//}
System.in.read();
System.in.read();
}
catch (Exception e) {
e.printStackTrace();
}
}
Thanks,
Khadar
On Fri, Jul 27, 2012 at 8:55 PM, Ralph Goers <[email protected]>wrote:
> Khadar,
>
> I am not sure if your reply was meant as a response to my comment or just
> as additional information. The Log4j 2 FlumeAppender is not part of Flume.
> See
> http://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAvroAppender.
> However, Log4j 2 is still waiting for its first release so you will have to
> build it yourself if you want to try it.
>
> Ralph
>
> On Jul 27, 2012, at 6:58 AM, khadar basha wrote:
>
> FlumeLog4jAvroAppender is available as part of 0.94. But its not available
> in 1.2.*Log4jAppender *is from flume-ng-log4jappender-1.2.0.jar
>
> I think it is replace as part of 1.2.
>
> On Fri, Jul 27, 2012 at 6:31 PM, Ralph Goers <[email protected]> wrote:
>
>> You might consider looking at Log4j 2. It has a Flume Appender that
>> records the whole formatted log message in the body. In addition, it will
>> record the MDC fields as well.
>>
>> Sent from my iPad
>>
>> On Jul 27, 2012, at 5:49 AM, khadar basha <[email protected]> wrote:
>>
>> Hi
>>
>> I am using Flume1.2. Using avro source and hdfs sink. Sending message
>> using the Logger channel from application server. For that i am using the
>> *org.apache.flume.clients.log4jappender.Log4jAppender *in MyApp.
>>
>> But i am getting only body of the message (description). loosing the
>> time, thread, Level information.
>>
>> flume-conf.properties file
>> ==================
>> agent2Test1.sources = seqGenSrc
>> agent2Test1.channels = memoryChannel
>> agent2Test1.sinks = loggerSink
>>
>> # For each one of the sources, the type is defined
>> agent2Test1.sources.seqGenSrc.type = avro
>> agent2Test1.sources.seqGenSrc.bind=localhost
>> agent2Test1.sources.seqGenSrc.port=41414
>>
>> # interceptors for host and date
>> agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader =
>> host
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
>> = false
>> agent2Test1.sources.seqGenSrc.interceptors.time.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>>
>> # The channel can be defined as follows.
>> agent2Test1.sources.seqGenSrc.channels = memoryChannel
>>
>> # Each sink's type must be defined
>> agent2Test1.sinks.loggerSink.type = hdfs
>> agent2Test1.sinks.loggerSink.hdfs.path =
>> hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs
>>
>> agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream
>>
>> #Specify the channel the sink should use
>> agent2Test1.sinks.loggerSink.channel = memoryChannel
>>
>> # Each channel's type is defined.
>> agent2Test1.channels.memoryChannel.type = memory
>>
>> # Other config values specific to each type of channel(sink or source)
>> # can be defined as well
>> # In this case, it specifies the capacity of the memory channel
>> agent2Test1.channels.memoryChannel.capacity = 1000
>>
>>
>>
>> Sample java program to generate the log message:
>> =====================================
>>
>>
>> package com.test;
>>
>>
>> import org.apache.flume.clients.log4jappender.Log4jAppender;
>> import org.apache.log4j.Logger;
>> import org.apache.log4j.MDC;
>> import org.apache.log4j.PatternLayout;
>>
>> import java.util.UUID;
>>
>>
>> public class Main {
>> static Logger log = Logger.getLogger(Main.class);
>>
>> public static void main(String[] args) {
>> try {
>> Log4jAppender appender = new Log4jAppender();
>> appender.setHostname("localhost");
>> appender.setPort(41414);
>> appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user}
>> %X{field}> %m"));
>> // appender.setReconnectAttempts(100);
>>
>> appender.activateOptions();
>>
>> log.addAppender(appender);
>>
>> MDC.put("user", "chris");
>> // while (true) {
>> MDC.put("field", UUID.randomUUID().toString());
>> log.info("=====> Hello World");
>> try {
>> throw new Exception("Testing");
>> } catch (Exception e) {
>> log.error("Gone wrong ===>", e);
>> }
>> //}
>> System.in.read();
>> System.in.read();
>> }
>> catch (Exception e) {
>> e.printStackTrace();
>> }
>> }
>> }
>>
>>
>>
>> I am missing any config here ?
>>
>>
>>
>> --
>> Thanks,
>> Khadar
>>
>>
>
>
> --
> Thanks,
> Khadar
>
>
>
--
Thanks,
Khadar