Hi Hitesh,

Let me ask you some questions on this:
1) Did you try to run this pipeline with Direct runner locally?
2) Can you make sure that output messages were successfully written in the same 
queue as you read from after?
3) Can you make sure that your “processElement()” has been called while you 
read the messages?

I’d recommend you to run this pipeline with a local instance of RabbitMq and 
Direct runner. In case of reproducing this locally, it can be easier to debug. 

—
Alexey

> On 15 Oct 2022, at 19:53, chakranthi hitesh <[email protected]> wrote:
> 
> Hi Everyone,
> 
> I have been using Apache beam in the recent past. Recently, My work required 
> me to implement a Source and Sink Connect for Rabbitmq.
> Apache beam version: 2.38 , java SDK (java 1.8), Direct Runner
> 
> I'm able to make a successful connection to Rabbitmq queue and send some 
> messages into queue using standalone producer code.
> 
> When I try to consume those messages using beam pipeline , I am not able to 
> print out the messages,  In the output console the , the Code executes 
> continuously in Debug Mode without any messages.
> 
> 
> When I try to 
> 
> Here is the Code I'm using:
> 
> package org.rabbit;
> import org.apache.beam.sdk.io.rabbitmq.RabbitMqIO;
> import org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.SimpleFunction;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> 
> Public class RabbtiConsumer{
>     public static void main(String args[ ] ){
>       Pipeline p = Pipeline.create();
>       String serveruri = "amqp://user:password:host:port/virtual_host";
>        p.apply("Read from rabbit", RabbitMqIO.read()
>                    .withUri(serverUri)
>                    .withExchange(exchange_name, routing_key)
>                    .withQueue(queue_name))
>        .apply(ParDo.of(new DoFn<RabbitMqMessage,String>(){
>               @PocessElement <>
>                public  void  processElement (DoFn< RabbitMqMessage, 
> String>.ProcessContext c) {
>           String data = c.element().getBody().toString();
>           System.out.println("Reading Message from Queue" + data);
>           c.output(data);
>               }
>     }
>                 ));
> 
> 
>    p.run().waitUntilFinish();
> 
>       }
> }
> 
> 
> The message I'm putting into RabbitmQ queue is of XML format.
> I'm guessing this has to with Serialization. 
> 
> 
> If someone has worked on RabbitMqMessage serialization or encountered this 
> problem before, any help would be greatly appreciated.
> 
> Regards
> Hitesh
> 
> 
> 
> 
> 
>  <>
> 
>  <>
> 
> 
> 
> 
> 
> 
> 
> }
> 
> }
> 
> 
> 
> 
> 
> 
> 
> 

Reply via email to