附件图片,想把listener出来的数据,传给ctx。
如何实现这个数据的传递。
public class RMQRichParallelSource extends RichParallelSourceFunction<String>
implements MessageOrderListener {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Properties properties = new Properties();
// 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
consumer.subscribe(
"PRODNOC_KB_SYNC_CUST_ORDER",
"*",
this);
consumer.start();
}
@Override
public void run(SourceContext<String> ctx) {
}
@Override
public OrderAction consume(Message message, ConsumeOrderContext
consumeOrderContext) {
try {
System.out.println(new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return OrderAction.Success;
}
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
super.close();
}
}