Hi Puneet,
are you running this job on the cluster or locally in your IDE?
Regards,
Timo
Am 14.03.18 um 13:49 schrieb Puneet Kinra:
Hi
I used apache bahir connector below is the code.the job is getting
finished
and not generated the output as well ,ideal it should keep on running
below the code.
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.activemq.AMQSource;
import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
import org.apache.flink.streaming.connectors.activemq.DestinationType;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/**
* @author puneet
*
*/
public class TestAMQ {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
AMQSourceConfig<String> sourceConfig = new
AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(new
ActiveMQConnectionFactory("tcp://localhost:61616"))
.setDestinationName("test")
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.QUEUE)
.build();
DataStream < String > messageStream = env.addSource(new
AMQSource<String>(sourceConfig));
messageStream.print();
env.execute();
}
}
--
*Cheers *
*
*
*Puneet Kinra*
*
*
*Mobile:+918800167808 | Skype : [email protected]
<mailto:[email protected]>*
*e-mail :[email protected]
<mailto:[email protected]>*