You might be better off taking the results of the patch on the jira issue and subclassing them.
However my classes are attached.

There is an EsbMessageService object (not attached) that creates and stores the database record. The create method on this object also pulls out all the easy stuff from the message (the source for the standard ones shows how you can do that).
The rest of it is getting the data to be passed to setBody.

It works with MTOM, but I have an odd error with FastInfoset that may be a bug in FastInfoset (I can't see how it's my fault that all the binary elements in the message get replaced with the last binary element!).

Another difference between mine and the trunk ones is that my code for only running the inInterceptor once is different (I run once per exchange) - I tried using that from the trunk and it didn't work, so I'm not sure why that was.

Jim


On 14/06/2012 19:39, Marco Pas wrote:
Could you share the code of these interceptors as an example off how I
could implements them myself? It would help me a lot.
I have read your suggestion and i also have to log the xml messages to
an external system in my case a AMQP provider.

/Marco


2012/6/14 Jim Talbut <[email protected]>:
On 14/06/2012 18:33, Marco Pas wrote:
I need to have the plain xml in the interceptor so that i can use them
for specific logging tasks. The standard LogIn & LogOut interceptors
are not up to the task. Anyone willing to share some example on how i
could implement a simple incoming interceptor that is able to get the
incoming soap xml and a outgoing interceptor to again get the soap
xml.
I wrote my own interceptors based heavily on the standard interceptors.
But it was a lot of work and is fragile in that changes to the standard
interceptors won't get picked up by yours.

So I proposed a change to the standard interceptors to make them more
extensible: https://issues.apache.org/jira/browse/CXF-4368

I only did this a couple of days ago, it hasn't been picked up yet.

Jim



/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package com.groupgti.esb.cxf.interceptors;

import com.groupgti.esb.cxf.model.EsbMessage;
import com.groupgti.esb.cxf.model.services.EsbMessageService;
import java.io.*;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.LoggingMessage;
import org.apache.cxf.io.CacheAndWriteOutputStream;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.io.CachedOutputStreamCallback;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author jtalbut
 */
public class MessageLoggerOutbound extends AbstractPhaseInterceptor<Message>
{
    public static final String SINGLE_KEY = 
MessageLoggerOutbound.class.getName() + ".Processed";
    private static final Logger log = LoggerFactory.getLogger( 
MessageLoggerOutbound.class );
    private static final int limit = 10 * 1024 * 1024;
    private EsbMessageService messageService;

    public MessageLoggerOutbound( ) {
        super( Phase.PRE_STREAM );
    }

    public void setMessageService( EsbMessageService messageService ) {
        this.messageService = messageService;
    }

    @Override
    public void handleFault( Message message ) {
        log.debug( "handleFault" );
        try {
            internalHandleMessage(message);
        } catch( Throwable ex ) {
            log.error( "Exception thrown by internalHandleMessage: ", ex );
        } finally {
            log.debug( "handleFault - end" );
        }
    }

    private boolean onceOnly( Message message ) {
        if (message.getExchange().containsKey(SINGLE_KEY)) {
            return true;
        } else {
            message.getExchange().put(SINGLE_KEY, Boolean.TRUE);
            return false;
        }
    }
    
    @Override
    public void handleMessage(Message message) throws Fault {
        log.debug( "handleMessage" );
        try {
            // Write the output while caching it for the log message
            if( onceOnly( message ) ) {
                log.debug( "handled message previously" );
                return ;
            }
            log.debug( "handling message" );
            internalHandleMessage(message);
        } finally {
            log.debug( "handleMessage - end" );
        }
    }

    private void internalHandleMessage( Message message ) {
        final OutputStream os = message.getContent(OutputStream.class);
        final Writer iowriter = message.getContent(Writer.class);
        if (os == null && iowriter == null) {
            return;
        }
        
        if( os == null ) {
            log.error( "\nmessage.getContent( InputStream.class ) returned 
null" );
            EsbMessage record = messageService.create( message );
            log.trace( "Outbound message (without body): {}", record );
            messageService.save( record );
            message.setContent(Writer.class, iowriter);
        } else {
            final CacheAndWriteOutputStream newOut = new 
CacheAndWriteOutputStream( os );
            message.setContent( OutputStream.class, newOut );
            newOut.registerCallback( new LogCallback( log, message, os, 
messageService ) );
        }
    }
    
    protected void writePayload(StringBuilder builder, CachedOutputStream cos,
                                String encoding, String contentType) throws 
IOException {
        if (StringUtils.isEmpty(encoding)) {
            cos.writeCacheTo(builder, limit);
        } else {
            cos.writeCacheTo(builder, encoding, limit);
        }
    }
    
    protected void writePayload(StringBuilder builder, 
                                StringWriter stringWriter,
                                String contentType) 
        throws Exception {
        // Just transform the XML message when the cos has content
        StringBuffer buffer = stringWriter.getBuffer();
        if (buffer.length() > limit) {
            builder.append(buffer.subSequence(0, limit));
        } else {
            builder.append(buffer);
        }
    }

    class LogCallback implements CachedOutputStreamCallback
    {
        private Logger log;
        private EsbMessageService messageService;
        private final Message message;
        private final OutputStream origStream;

        public LogCallback( Logger log, final Message msg, final OutputStream 
os, EsbMessageService messageService ) {
            this.log = log;
            this.message = msg;
            this.origStream = os;
            this.messageService = messageService;
        }

        @Override
        public void onFlush( CachedOutputStream cos ) {
        }

        @Override
        public void onClose( CachedOutputStream cos ) {
            StringBuilder requestBuilder = new StringBuilder();
            String encoding = (String)message.get(Message.ENCODING);
            String ct = (String)message.get(Message.CONTENT_TYPE);
            try {
                writePayload(requestBuilder, cos, encoding, ct); 
            } catch( java.io.IOException ex ) {
                log.error( "Unable to write output stream to StringBuilder:\n" 
+ ex.toString() );
            }

            EsbMessage record = messageService.create( message );
            record.setBody( requestBuilder.toString() );
            log.trace( "Outbound message: {}", record );
            messageService.save( record );

            try {
                //empty out the cache
                cos.lockOutputStream();
                cos.resetOut(null, false);
            } catch (Exception ex) {
                //ignore
            }
            
            message.setContent( OutputStream.class, origStream );
        }
    }
    
    private class LogWriter extends FilterWriter {
        StringWriter out2;
        int count;
        Logger logger; 
        Message message;
        
        public LogWriter(  Logger log, final Message msg, final Writer writer, 
EsbMessageService messageService ) {
            super(writer);
            this.logger = log;
            this.message = msg;
            if (!(writer instanceof StringWriter)) {
                out2 = new StringWriter();
            }
        }
        @Override
        public void write(int c) throws IOException {
            super.write(c);
            if (out2 != null && count < limit) {
                out2.write(c);
            }
            count++;
        }
        @Override
        public void write(char[] cbuf, int off, int len) throws IOException {
            super.write(cbuf, off, len);
            if (out2 != null && count < limit) {
                out2.write(cbuf, off, len);
            }
            count += len;
        }
        @Override
        public void write(String str, int off, int len) throws IOException {
            super.write(str, off, len);
            if (out2 != null && count < limit) {
                out2.write(str, off, len);
            }
            count += len;
        }
        @Override
        public void close() throws IOException {
            EsbMessage record = messageService.create( message );
            StringBuilder buffer = new StringBuilder();
            if (count >= limit) {
                buffer.append("(message truncated to " + limit + " bytes)\n");
            }
            StringWriter w2 = out2;
            if (w2 == null) {
                w2 = (StringWriter)out;
            }
            String ct = (String)message.get(Message.CONTENT_TYPE);
            try {
                writePayload(buffer, w2, ct); 
            } catch (Exception ex) {
                //ignore
            }
            record.setBody(buffer.toString());
            messageService.save(record);
            message.setContent(Writer.class, out);
            super.close();
        }
    }

}
/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package com.groupgti.esb.cxf.interceptors;

import com.groupgti.esb.cxf.model.EsbMessage;
import com.groupgti.esb.cxf.model.services.EsbMessageService;
import java.io.*;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author jtalbut
 */
public class MessageLoggerInbound extends AbstractPhaseInterceptor<Message>
{
    private static final Logger log = LoggerFactory.getLogger( 
MessageLoggerInbound.class );
    public static final String SINGLE_KEY = 
MessageLoggerInbound.class.getName() + ".Processed";
    private static final int limit = 10 * 1024 * 1024;

    private EsbMessageService messageService;

    public MessageLoggerInbound()
    {
        super( Phase.RECEIVE );
    }

    public void setMessageService( EsbMessageService messageService ) {
        this.messageService = messageService;
    }
    
    protected void writePayload(StringBuilder builder, CachedOutputStream cos,
                                String encoding, String contentType) throws 
IOException {
        if (StringUtils.isEmpty(encoding)) {
            cos.writeCacheTo(builder, limit);
        } else {
            cos.writeCacheTo(builder, encoding, limit);
        }
    }
    
    private boolean onceOnly( Message message ) {
        if (message.getExchange().containsKey(SINGLE_KEY)) {
            return true;
        } else {
            message.getExchange().put(SINGLE_KEY, Boolean.TRUE);
            return false;
        }
    }

    @Override
    public void handleMessage( Message message ) throws Fault
    {
        log.debug( "handleMessage" );
        
        if( onceOnly( message ) ) {
            log.debug( "handled message previously" );
            return ;
        }
        
        log.debug( "handling message" );
        try {
            EsbMessage record = messageService.create( message );
            try {
                
                InputStream is = message.getContent(InputStream.class);
                StringBuilder buffer = new StringBuilder();
                String encoding = (String)message.get(Message.ENCODING);
                String ct = (String)message.get(Message.CONTENT_TYPE);
                
                if (is != null) {
                    CachedOutputStream bos = new CachedOutputStream();
                    try {
                        IOUtils.copy(is, bos);

                        bos.flush();
                        is.close();

                        message.setContent(InputStream.class, 
bos.getInputStream());
                        if (bos.getTempFile() != null) {
                            //large thing on disk...
                            buffer.append("\nMessage (saved to tmp file):\n");
                            buffer.append("Filename: " + 
bos.getTempFile().getAbsolutePath() + "\n");
                        }
                        if (bos.size() > limit) {
                            buffer.append("(message truncated to " + limit + " 
bytes)\n");
                        }
                        writePayload(buffer, bos, encoding, ct); 

                        bos.close();
                    } catch (Exception e) {
                        throw new Fault(e);
                    }
                } else {
                    Reader reader = message.getContent(Reader.class);
                    if (reader != null) {
                        try {
                            BufferedReader r = new BufferedReader(reader, 
limit);
                            r.mark(limit);
                            char b[] = new char[limit];
                            int i = r.read(b);
                            buffer.append(b, 0, i);
                            r.reset();
                            message.setContent(Reader.class, r);
                        } catch (Exception e) {
                            throw new Fault(e);
                        }

                    }
                }
                record.setBody( buffer.toString() );
            } finally {
                messageService.save( record );
            }
        } finally {
            log.debug( "handleMessage - end" );
        }
    }
}

Reply via email to