Jan,

I really appreciate the help, unfortunately your test case does not even reproduce the issue I am having. I think I may not have been clear in my original message.

The issue is in a custom DataFormat, if a set a *new* header, that header will be gone from the Exchange, for the rest of the route, down-stream from the unmarshal (or marshal) call.

So the issue is just setting a header and checking it it's still there. So the best way to recreate the problem is to create the most simplest DataFormat because marshal/unmarshal is not the issue - the issue is setting a new header *inside* the custom marshal/unmarshal methods. So I created a test case whose custom DataFormat does the bare minimum - no file I/O, no serialization - just string manipulation.

Maybe you and/or other can have a look and explain why there's the issue.

Thanks,

Chris

(the code formatting will be wrecked by the mailing list line length limit)

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;


import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;


public class SetHeaderDemo extends CamelTestSupport {

    @Test
    public void setHeaderInDataFormatProblem() throws Exception {
        MockEndpoint mock = getMockEndpoint("mock:result");
        mock.expectedMessageCount(1);
template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n", "DEMO_HEADER", "Hello...");

        // This passes
        assertExpression(mock.getReceivedExchanges().get(0),
                "simple", "${in.header.DEMO_HEADER}", "Hello...");

        // This FAILS... WHY??? *************
        assertExpression(mock.getReceivedExchanges().get(0),
"simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it be set?");

        assertMockEndpointsSatisfied();
    }


    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {

        return new RouteBuilder() {
            @Override
            public void configure() {
                from("direct:start")
.unmarshal("customFmt") // <== will try to add a new header to Exchange IN msg

.to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") // <== not there
                .to("mock:result");
            }
        };
    }

    @Override
    protected JndiRegistry createRegistry() throws Exception {
        JndiRegistry registry = super.createRegistry();

        CustomDataFormat fmt = new CustomDataFormat();
        registry.bind("customFmt", fmt);
        return registry;
    }
}

/**
 * Simplest, contrived DataFormat impl to demonstrate that it's impossible
 * to set a new header in the DataFormat's implementation methods and
 * see the newly added header down-stream from the marshal/unmarshal
 * call(s).
 *
 */
class CustomDataFormat implements DataFormat {

    /**
     * Expects the body to be a newline-delimited list of strings,
     * which will be unmarshalled to a string array, whose elements
     * are the "lines" in the "document".
     *
     * Obviously the marshal/unmarshal process is not important - the
     * issue is that if a new header is added in the DataFormat marshal or
     * unmarshal - it will be GONE after returning.
     */
    @Override
    public Object unmarshal(Exchange exchange, InputStream stream)
            throws Exception {
        List<String> result = new ArrayList<String>();

        BufferedReader in
            = new BufferedReader(new InputStreamReader(stream));

        String line = null;
        while((line = in.readLine()) != null) {
            result.add(line);
        }

exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it be set?");
        return result;
    }

    @Override
public void marshal(Exchange exchange, Object graph, OutputStream stream)
            throws Exception {
        throw new UnsupportedOperationException("Not implemented.");
    }
}




On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
I tried building my own DF and that works (for me)

Jan


package org.apache.camel.dataformat;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;

import javax.activation.DataHandler;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.ExchangeBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.FileUtil;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class SaveHeaderTest extends CamelTestSupport {
        
        private static File dataFile = new File("output/message.ser");
        
        DataFormat customDataFormat = new CustomDataFormat();
        
        @After
        @Before
        public void cleanup() {
                FileUtil.deleteFile(dataFile);
        }
        
        @SuppressWarnings("unchecked")
        @Test
        public void save() throws FileNotFoundException, Exception {
                File writtenTo = new File("output/message.ser");
                assertFalse(writtenTo.exists());
                Exchange exchange = ExchangeBuilder.anExchange(context)
                                .withBody("Hello World")
                                .withHeader("from", "Apache Camel")
                                .withHeader("test", "save")
                                .withHeader(Exchange.FILE_NAME,
"message.ser")
                                .build();
                template.send("direct:save", exchange);
                assertTrue(writtenTo.exists());

                // actual
                ObjectInputStream in = new ObjectInputStream(new
FileInputStream(dataFile));
                Object body = in.readObject();
                Map<String,Object> headers = (Map<String, Object>)
in.readObject();
                Map<String,DataHandler> attachments = (Map<String,
DataHandler>) in.readObject();
                String messageId = (String) in.readObject();
                boolean fault = (boolean) in.readObject();
                
                assertEquals("Hello World", body);
                assertEquals("Apache Camel", headers.get("from"));
                assertEquals("save", headers.get("test"));
                assertTrue(attachments.isEmpty());
                assertEquals(exchange.getIn().getMessageId(), messageId);
                assertFalse(fault);
        }
        
        @Test
        public void load() throws Exception {
                // create test data
                ObjectOutputStream out = new ObjectOutputStream(new
FileOutputStream(dataFile));
                out.writeObject("Hello World");
                
                Map<String,Object> headers = new HashMap<String, Object>();
                headers.put("from", "Apache Camel");
                headers.put("test", "load");
                out.writeObject(headers);
                
                Map<String,DataHandler> attachments = new
HashMap<String,DataHandler>();
                out.writeObject(attachments);
                
                out.writeObject("message-id");
                out.writeObject(false);
                out.close();
                
                // load the saved 'exchange'
                Exchange loadCommand = ExchangeBuilder.anExchange(context)
                                .withBody(dataFile.getAbsolutePath())
                                .build();
                Exchange response = template.send("direct:load",
loadCommand);
                assertEquals("Hello World",
response.getOut().getBody(String.class));
                assertEquals("Apache Camel",
response.getOut().getHeader("from"));
                assertEquals("load", response.getOut().getHeader("test"));
        }
        
        @Test
        public void turnaround() {
                Exchange exchange = ExchangeBuilder.anExchange(context)
                                .withBody("Hello World")
                                .withHeader("from", "Apache Camel")
                                .build();
                Exchange response = template.send("direct:turnaround",
exchange);
                assertEquals("Hello World",
response.getOut().getBody(String.class));
                assertEquals("Apache Camel",
response.getOut().getHeader("from"));
        }
        
        
        @Override
        protected RouteBuilder createRouteBuilder() throws Exception {
                return new RouteBuilder() {
                        @Override
                        public void configure() throws Exception {
                                from("direct:save")
                                        .marshal(customDataFormat)
                                        .to("file:output");
                                from("direct:load")
                                        .process(openFile())
                                        .unmarshal(customDataFormat);
                                from("direct:turnaround")
                                        .marshal(customDataFormat)
                                        .unmarshal(customDataFormat);
                        }

                        private Processor openFile() {
                                return new Processor() {
                                        @Override
                                        public void process(Exchange
exchange) throws Exception {
                                                String filename =
exchange.getIn().getBody(String.class);
                                                FileInputStream stream = new
FileInputStream(filename);
        
exchange.getIn().setBody(stream);
                                        }
                                };
                        }
                };
        }
        
        class CustomDataFormat implements DataFormat {
                
                @Override
                public void marshal(Exchange exchange, Object graph,
OutputStream stream)
                                throws Exception {
                        ObjectOutputStream out = null;
                        
                        try {
                                out = new ObjectOutputStream(stream);
                                // Save the body
                                out.writeObject(graph);
                                // Save the message
                                Message message = exchange.getIn();
                                if (message != null) {
                                        // Message is not serializable, so
save the data individually
        
out.writeObject(message.getHeaders());
        
out.writeObject(message.getAttachments());
        
out.writeObject(message.getMessageId());
                                        out.writeObject(message.isFault());
                                }
                        } finally {
                                IOUtils.closeQuietly(out);
                        }
                }

                @SuppressWarnings("unchecked")
                @Override
                public Object unmarshal(Exchange exchange, InputStream
stream)
                                throws Exception {
                        
                        ObjectInputStream in = null;
                        try {
                                in = new ObjectInputStream(stream);
                                
                                // read the raw data
                                Object body = in.readObject();
                                Map<String,Object> headers = (Map<String,
Object>) in.readObject();
                                Map<String,DataHandler> attachments =
(Map<String, DataHandler>) in.readObject();
                                String messageId = (String) in.readObject();
                                boolean fault = (boolean) in.readObject();
                                
                                // build the message
                                Message msg = new DefaultMessage();
                                msg.setBody(body);
                                msg.setAttachments(attachments);
                                msg.setHeaders(headers);
                                msg.setMessageId(messageId);
                                msg.setFault(fault);
                                
                                return msg;
                        } finally {
                                IOUtils.closeQuietly(in);
                        }
                }
        }
        
}

-----Ursprüngliche Nachricht-----
Von: Jan Matèrne (jhm) [mailto:[email protected]]
Gesendet: Donnerstag, 19. September 2013 07:06
An: [email protected]
Betreff: AW: How to set a header in custom DataFormat?

What I have found is, that the exchange object which is passed to the
DataFormat is only used for getting the CamelContext.
I searched a little bit further and found a processor for
unmarshalling:

org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
AsyncCallback)

And there is a note

   Object result = dataFormat.unmarshal(exchange, stream);
     if (result instanceof Exchange) {
       if (result != exchange) {
         // it's not allowed to return another exchange other than the
one provided to dataFormat
         throw new RuntimeCamelException("The returned exchange " +
result + " is not the same as " + exchange + " provided to the
DataFormat");


The logic of that processor is:
- get the object from the stream
- if it is an exchange, throw that exception
- if it is a message, store as 'out' on the exchange-parameter
- if it is something else, store it as body of the 'out' message


So have you tried setting the header on the 'out' message?


Jan



-----Ursprüngliche Nachricht-----
Von: Chris [mailto:[email protected]]
Gesendet: Mittwoch, 18. September 2013 22:58
An: [email protected]
Betreff: How to set a header in custom DataFormat?

I implemented a custom DataFormat and in the "unmarshal(Exchange
exchange, InputStream stream)" implementation I set a header, but
upon
attempting to retrieve the header downstream from the "unmarshal"
call, the header is not there.  Since the pattern is inOnly, I added
the header to the in Message.  I've done similar with custom
Processors and in that case, it works.  What is different with
DataFormat?  Is there any way to set header(s) from DataFormat
marshal/unmarshal?

Thanks,

Chris


Reply via email to