附件图片,想把listener出来的数据,传给ctx。
如何实现这个数据的传递。
public class RMQRichParallelSource extends RichParallelSourceFunction<String> 
implements MessageOrderListener {

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Properties properties = new Properties();

// 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

consumer.subscribe(
"PRODNOC_KB_SYNC_CUST_ORDER",
"*",
                this);
consumer.start();
}

@Override
public void run(SourceContext<String> ctx) {


    }

@Override
public OrderAction consume(Message message, ConsumeOrderContext 
consumeOrderContext) {
try {
            System.out.println(new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
            e.printStackTrace();
}
return OrderAction.Success;
}

@Override
public void cancel() {
    }

@Override
public void close() throws Exception {
super.close();
}
}

回复