You might be better off taking the results of the patch on the jira
issue and subclassing them.
However my classes are attached.
There is an EsbMessageService object (not attached) that creates and
stores the database record.
The create method on this object also pulls out all the easy stuff from
the message (the source for the standard ones shows how you can do that).
The rest of it is getting the data to be passed to setBody.
It works with MTOM, but I have an odd error with FastInfoset that may be
a bug in FastInfoset (I can't see how it's my fault that all the binary
elements in the message get replaced with the last binary element!).
Another difference between mine and the trunk ones is that my code for
only running the inInterceptor once is different (I run once per
exchange) - I tried using that from the trunk and it didn't work, so I'm
not sure why that was.
Jim
On 14/06/2012 19:39, Marco Pas wrote:
Could you share the code of these interceptors as an example off how I
could implements them myself? It would help me a lot.
I have read your suggestion and i also have to log the xml messages to
an external system in my case a AMQP provider.
/Marco
2012/6/14 Jim Talbut <[email protected]>:
On 14/06/2012 18:33, Marco Pas wrote:
I need to have the plain xml in the interceptor so that i can use them
for specific logging tasks. The standard LogIn & LogOut interceptors
are not up to the task. Anyone willing to share some example on how i
could implement a simple incoming interceptor that is able to get the
incoming soap xml and a outgoing interceptor to again get the soap
xml.
I wrote my own interceptors based heavily on the standard interceptors.
But it was a lot of work and is fragile in that changes to the standard
interceptors won't get picked up by yours.
So I proposed a change to the standard interceptors to make them more
extensible: https://issues.apache.org/jira/browse/CXF-4368
I only did this a couple of days ago, it hasn't been picked up yet.
Jim
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package com.groupgti.esb.cxf.interceptors;
import com.groupgti.esb.cxf.model.EsbMessage;
import com.groupgti.esb.cxf.model.services.EsbMessageService;
import java.io.*;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.LoggingMessage;
import org.apache.cxf.io.CacheAndWriteOutputStream;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.io.CachedOutputStreamCallback;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author jtalbut
*/
public class MessageLoggerOutbound extends AbstractPhaseInterceptor<Message>
{
public static final String SINGLE_KEY =
MessageLoggerOutbound.class.getName() + ".Processed";
private static final Logger log = LoggerFactory.getLogger(
MessageLoggerOutbound.class );
private static final int limit = 10 * 1024 * 1024;
private EsbMessageService messageService;
public MessageLoggerOutbound( ) {
super( Phase.PRE_STREAM );
}
public void setMessageService( EsbMessageService messageService ) {
this.messageService = messageService;
}
@Override
public void handleFault( Message message ) {
log.debug( "handleFault" );
try {
internalHandleMessage(message);
} catch( Throwable ex ) {
log.error( "Exception thrown by internalHandleMessage: ", ex );
} finally {
log.debug( "handleFault - end" );
}
}
private boolean onceOnly( Message message ) {
if (message.getExchange().containsKey(SINGLE_KEY)) {
return true;
} else {
message.getExchange().put(SINGLE_KEY, Boolean.TRUE);
return false;
}
}
@Override
public void handleMessage(Message message) throws Fault {
log.debug( "handleMessage" );
try {
// Write the output while caching it for the log message
if( onceOnly( message ) ) {
log.debug( "handled message previously" );
return ;
}
log.debug( "handling message" );
internalHandleMessage(message);
} finally {
log.debug( "handleMessage - end" );
}
}
private void internalHandleMessage( Message message ) {
final OutputStream os = message.getContent(OutputStream.class);
final Writer iowriter = message.getContent(Writer.class);
if (os == null && iowriter == null) {
return;
}
if( os == null ) {
log.error( "\nmessage.getContent( InputStream.class ) returned
null" );
EsbMessage record = messageService.create( message );
log.trace( "Outbound message (without body): {}", record );
messageService.save( record );
message.setContent(Writer.class, iowriter);
} else {
final CacheAndWriteOutputStream newOut = new
CacheAndWriteOutputStream( os );
message.setContent( OutputStream.class, newOut );
newOut.registerCallback( new LogCallback( log, message, os,
messageService ) );
}
}
protected void writePayload(StringBuilder builder, CachedOutputStream cos,
String encoding, String contentType) throws
IOException {
if (StringUtils.isEmpty(encoding)) {
cos.writeCacheTo(builder, limit);
} else {
cos.writeCacheTo(builder, encoding, limit);
}
}
protected void writePayload(StringBuilder builder,
StringWriter stringWriter,
String contentType)
throws Exception {
// Just transform the XML message when the cos has content
StringBuffer buffer = stringWriter.getBuffer();
if (buffer.length() > limit) {
builder.append(buffer.subSequence(0, limit));
} else {
builder.append(buffer);
}
}
class LogCallback implements CachedOutputStreamCallback
{
private Logger log;
private EsbMessageService messageService;
private final Message message;
private final OutputStream origStream;
public LogCallback( Logger log, final Message msg, final OutputStream
os, EsbMessageService messageService ) {
this.log = log;
this.message = msg;
this.origStream = os;
this.messageService = messageService;
}
@Override
public void onFlush( CachedOutputStream cos ) {
}
@Override
public void onClose( CachedOutputStream cos ) {
StringBuilder requestBuilder = new StringBuilder();
String encoding = (String)message.get(Message.ENCODING);
String ct = (String)message.get(Message.CONTENT_TYPE);
try {
writePayload(requestBuilder, cos, encoding, ct);
} catch( java.io.IOException ex ) {
log.error( "Unable to write output stream to StringBuilder:\n"
+ ex.toString() );
}
EsbMessage record = messageService.create( message );
record.setBody( requestBuilder.toString() );
log.trace( "Outbound message: {}", record );
messageService.save( record );
try {
//empty out the cache
cos.lockOutputStream();
cos.resetOut(null, false);
} catch (Exception ex) {
//ignore
}
message.setContent( OutputStream.class, origStream );
}
}
private class LogWriter extends FilterWriter {
StringWriter out2;
int count;
Logger logger;
Message message;
public LogWriter( Logger log, final Message msg, final Writer writer,
EsbMessageService messageService ) {
super(writer);
this.logger = log;
this.message = msg;
if (!(writer instanceof StringWriter)) {
out2 = new StringWriter();
}
}
@Override
public void write(int c) throws IOException {
super.write(c);
if (out2 != null && count < limit) {
out2.write(c);
}
count++;
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
super.write(cbuf, off, len);
if (out2 != null && count < limit) {
out2.write(cbuf, off, len);
}
count += len;
}
@Override
public void write(String str, int off, int len) throws IOException {
super.write(str, off, len);
if (out2 != null && count < limit) {
out2.write(str, off, len);
}
count += len;
}
@Override
public void close() throws IOException {
EsbMessage record = messageService.create( message );
StringBuilder buffer = new StringBuilder();
if (count >= limit) {
buffer.append("(message truncated to " + limit + " bytes)\n");
}
StringWriter w2 = out2;
if (w2 == null) {
w2 = (StringWriter)out;
}
String ct = (String)message.get(Message.CONTENT_TYPE);
try {
writePayload(buffer, w2, ct);
} catch (Exception ex) {
//ignore
}
record.setBody(buffer.toString());
messageService.save(record);
message.setContent(Writer.class, out);
super.close();
}
}
}
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package com.groupgti.esb.cxf.interceptors;
import com.groupgti.esb.cxf.model.EsbMessage;
import com.groupgti.esb.cxf.model.services.EsbMessageService;
import java.io.*;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author jtalbut
*/
public class MessageLoggerInbound extends AbstractPhaseInterceptor<Message>
{
private static final Logger log = LoggerFactory.getLogger(
MessageLoggerInbound.class );
public static final String SINGLE_KEY =
MessageLoggerInbound.class.getName() + ".Processed";
private static final int limit = 10 * 1024 * 1024;
private EsbMessageService messageService;
public MessageLoggerInbound()
{
super( Phase.RECEIVE );
}
public void setMessageService( EsbMessageService messageService ) {
this.messageService = messageService;
}
protected void writePayload(StringBuilder builder, CachedOutputStream cos,
String encoding, String contentType) throws
IOException {
if (StringUtils.isEmpty(encoding)) {
cos.writeCacheTo(builder, limit);
} else {
cos.writeCacheTo(builder, encoding, limit);
}
}
private boolean onceOnly( Message message ) {
if (message.getExchange().containsKey(SINGLE_KEY)) {
return true;
} else {
message.getExchange().put(SINGLE_KEY, Boolean.TRUE);
return false;
}
}
@Override
public void handleMessage( Message message ) throws Fault
{
log.debug( "handleMessage" );
if( onceOnly( message ) ) {
log.debug( "handled message previously" );
return ;
}
log.debug( "handling message" );
try {
EsbMessage record = messageService.create( message );
try {
InputStream is = message.getContent(InputStream.class);
StringBuilder buffer = new StringBuilder();
String encoding = (String)message.get(Message.ENCODING);
String ct = (String)message.get(Message.CONTENT_TYPE);
if (is != null) {
CachedOutputStream bos = new CachedOutputStream();
try {
IOUtils.copy(is, bos);
bos.flush();
is.close();
message.setContent(InputStream.class,
bos.getInputStream());
if (bos.getTempFile() != null) {
//large thing on disk...
buffer.append("\nMessage (saved to tmp file):\n");
buffer.append("Filename: " +
bos.getTempFile().getAbsolutePath() + "\n");
}
if (bos.size() > limit) {
buffer.append("(message truncated to " + limit + "
bytes)\n");
}
writePayload(buffer, bos, encoding, ct);
bos.close();
} catch (Exception e) {
throw new Fault(e);
}
} else {
Reader reader = message.getContent(Reader.class);
if (reader != null) {
try {
BufferedReader r = new BufferedReader(reader,
limit);
r.mark(limit);
char b[] = new char[limit];
int i = r.read(b);
buffer.append(b, 0, i);
r.reset();
message.setContent(Reader.class, r);
} catch (Exception e) {
throw new Fault(e);
}
}
}
record.setBody( buffer.toString() );
} finally {
messageService.save( record );
}
} finally {
log.debug( "handleMessage - end" );
}
}
}