Hi,

Here is my use case:

Dynamically adding topics for various different types of consumer
capability is easy, but it requires creating a producer for each one.

(I can't pass the topic to the producer....
E.g. producer.send(message, "fruit:orange")
then
producer.send(message, "fruit:apple")

I don't want to have to re-code the application when "banana" comes along
to add another producer.

So, I'd like to use a pulsar function to do the routing from a single
producer to a dynamic second topic, so new consumers only need to process
their own type.


//

producer = pulsarClient.newProducer(AvroSchema.of(Fruit.class))
        .producerName("producer:fruit-1")
        .topic("fruit")
        .create();

producer.newMessage().property("topic2", "fruit:" +
myFruit.type).value(myFruit).send();

Then in my function, I can take the message property "topic2" and re-send a
message on the new  topic.

However, is there a way I can avoid having to get involved in SerDe or
Schemas in the function?
The message bytes won't change, and the consumers all have access to an
AvroSchema definition.

I notice in the java Function API I have an (Object input) which I suspect
is an org.Json structure. but the context.newOutputMessage() wants a schema.

I don't really want to have to re-construct a "Fruit" object from org.Json
types, just to then encode it back to the same bytes on the wire.

Is there a shortcut I can just publish a byte[] from the orignal message
input byte[]

Thanks

Rob





Rob Shepherd BEng PhD

Reply via email to