Finally got rid of all errors but now I have the problem that the apex application does not seem to register at the RabbitMQ exchange.
This is my code: @ApplicationAnnotation(name="RabbitMQ2HDFS") public class RabbitMQApplication implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) { RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator()); in.setHost("localhost"); //in.setPort(5672); in.setExchange("apex"); in.setExchangeType("fanout"); ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); dag.addStream("rand_console",in.outputPort, console.input); } } If I launch the application everthing executes without an error but if i list the bindings on the exchange, there is none. Anyone even an idea how i can start to debug this? Cheers Manfred. Am 08.06.2017 um 18:04 schrieb a...@x5h.eu: > > Okay i found the error, I copied the LineOutputOperator.java > <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java> > from the jmsActiveMQ example and found there > public class LineOutputOperator extends AbstractFileOutputOperator<String> > > Instead i took the LineOutputOperator.java > <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from > the Kafka 0.9 example there the class is correctly defined for the > RabbitMQInputOperator > > So far so good now it compiles without errors. > > Cheers > > Manfred > > Am 08.06.2017 um 17:38 schrieb a...@x5h.eu: >> >> I still don't get it completely: (The rest of the code is in the >> Email before, this is only the necessary sample) >> >> 1. dag.addStream("test", rabbitInput.output, out.input); >> Results in the following error: >> [ERROR] symbol: variable output >> [ERROR] location: variable rabbitInput of type >> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >> >> 2. dag.addStream("test", rabbitInput.outputPort, out.input); >> Results in the following error: >> [ERROR] >> >> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] >> no suitable method found for >> >> addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>) >> [ERROR] method >> >> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >> extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) >> is not applicable >> [ERROR] (inferred type does not conform to upper bound(s) >> [ERROR] inferred: byte[] >> [ERROR] upper bound(s): java.lang.String,java.lang.Object) >> [ERROR] method >> >> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >> extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is >> not applicable >> [ERROR] (inferred type does not conform to upper bound(s) >> [ERROR] inferred: byte[] >> [ERROR] upper bound(s): java.lang.String,java.lang.Object) >> [ERROR] method >> >> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >> extends T>,com.datatorrent.api.Operator.InputPort<? super >> T>,com.datatorrent.api.Operator.InputPort<? super T>) is not >> applicable >> [ERROR] (cannot infer type-variable(s) T >> [ERROR] (actual and formal argument lists differ in length)) >> >> >> >> It seems that on the one hand the RabbitMQInputOperator.class does >> not have an output method and on the other hand the addStream method >> only accepts outputPort combined with inputPort methods or output and >> input methods of the corresponding classes. Does that mean I only can >> use a class that implements inputPort method for this example? >> >> Cheers >> >> Manfred. >> >> >> >> Am 08.06.2017 um 10:05 schrieb a...@x5h.eu: >>> >>> Sorry the two Snippets below where from different iterations. The >>> Error I get is the following: >>> >>> [ERROR] >>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] >>> cannot find symbol >>> [ERROR] symbol: variable output >>> [ERROR] location: variable rabbitInput of type >>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >>> >>> My Code is as follows: >>> >>> >>> package com.example.rabbitMQ; >>> >>> import org.apache.hadoop.conf.Configuration; >>> >>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; >>> import com.datatorrent.api.annotation.ApplicationAnnotation; >>> import com.datatorrent.api.StreamingApplication; >>> import com.datatorrent.api.DAG; >>> import com.datatorrent.lib.io.jms.JMSStringInputOperator; >>> >>> @ApplicationAnnotation(name="RabbitMQ2HDFS") >>> public class RabbitMQApplication implements StreamingApplication >>> { >>> >>> @Override >>> public void populateDAG(DAG dag, Configuration conf) >>> { >>> >>> RabbitMQInputOperator rabbitInput = >>> dag.addOperator("Consumer",RabbitMQInputOperator.class); >>> rabbitInput.setHost("localhost"); >>> rabbitInput.setPort(5672); >>> rabbitInput.setExchange(""); >>> rabbitInput.setQueueName("hello"); >>> LineOutputOperator out = dag.addOperator("fileOut", new >>> LineOutputOperator()); >>> >>> dag.addStream("data", rabbitInput.output, out.input); >>> } >>> } >>> >>> Cheers >>> >>> Manfred. >>> >>> >>> >>> Am 08.06.2017 um 04:34 schrieb vikram patil: >>>> Hi, >>>> dag.addStream() is actually used to create stream of from one Operator >>>> output port to other operators input port. >>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>> RabbitMQInputOperator.class); >>>> dag.addStream("data", *rabbitInput*.output, out.input); >>>> Looks like your operator name is incorrect? I see in your code >>>> snippet above, name of of RabbiMQInputOperator is *"Consumer".* >>>> >>>> In property name, you need to provide operator name you have >>>> specified in addOperator(*"NAME OF THE OPERATOR"*, >>>> RabbitMQInputOperator.class) api call. >>>> >>>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct >>>> correct given your operator name is correct ). >>>> >>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on >>>> your code snippet ). >>>> >>>> I think tests which are provided in the Apache Malhar are very >>>> detailed, they run in local mode as unit tests so we have mocked >>>> actual rabbitmq by custom message publisher. >>>> >>>> For timebeing you set only queuename and hostname as >>>> >>>> // set your rabbitmq host . >>>> consumer.setHost("localhost"); // set your rabbitmq port >>>> consumer.setPort(5672) // It depends on your rabbitmq producer >>>> configuration but by default // its default exchange with "" ( No >>>> Name is provided ). consumer.setExchange(""); // set your queue >>>> name consumer.setQueueName("YOUR_QUEUE_NAME") >>>> >>>> >>>> >>>> >>>> If its okay, could you please share code from your application.java >>>> and properties.xml here? >>>> >>>> Thanks, >>>> Vikram >>>> >>>> >>>> On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu <mailto:a...@x5h.eu>> >>>> wrote: >>>> >>>> Thanks very much for the help. The only problem left is that I >>>> don't quite understand dag.addstream(). >>>> >>>> I tried this >>>> >>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>> RabbitMQInputOperator.class); >>>> dag.addStream("data", rabbitInput.output, out.input); >>>> >>>> but obviously this doesn't work. What I don't get is the >>>> difference between the ActiveMQ example and the RabbitMQ >>>> example. I looked over the test examples for RabbitMQ but don't >>>> seem to understand the logic behind it. >>>> >>>> Is this the correct wax to specify properties: >>>> <property> >>>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name> >>>> <value>500</value> >>>> </property> >>>> >>>> Cheers >>>> Manfred. >>>> >>>> >>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil: >>>>> Yes, you would need Application.java which will be way to define a DAG >>>>> for Apex Application. >>>>> >>>>> Please have look at the code from following example to find out how to >>>>> write JMS ActiveMQ based example: >>>>> >>>>> >>>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ >>>>> >>>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ> >>>>> >>>>> This is how you can instantiate RabbitMQINputOperator and to a dag. >>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>>> RabbitMQInputOperator.class); >>>>> >>>>> >>>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag >>>>> >>>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag> >>>>> >>>>> Following properties need to be specified in properties.xml >>>>> >>>>> * Properties:<br> >>>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br> >>>>> * <b>bufferSize</b>: Size of holding buffer<br> >>>>> * <b>host</b>:the address for the consumer to connect to rabbitMQ >>>>> producer<br> >>>>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ >>>>> producer<br> >>>>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to >>>>> rabbitMQ producer<br> >>>>> * <b>routingKey</b>:the routingKey for the consumer to connect to >>>>> rabbitMQ producer<br> >>>>> * <b>queueName</b>:the queueName for the consumer to connect to >>>>> rabbitMQ producer<br> >>>>> * <br> >>>>> >>>>> Reference: >>>>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java >>>>> >>>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java> >>>>> >>>>> Thanks, >>>>> Vikram >>>>> >>>>> On Wed, Jun 7, 2017 at 3:19 PM, <a...@x5h.eu> <mailto:a...@x5h.eu> >>>>> wrote: >>>>>> Hello, >>>>>> >>>>>> I compiled the whole thing but now I don't know exactly how to get it >>>>>> running in Apex. Do I need an application.java like in the tutorial? >>>>>> I do >>>>>> have a simple RabbitMQ queue up and running on the server. How do I >>>>>> consume >>>>>> the messages with Apex and write them to hdfs? >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Manfred >>>>>> >>>>>> Following steps were necessary to get the RabbitMq test to compile >>>>>> >>>>>> @TimeoutException >>>>>> import java.util.concurrent.TimeoutException; >>>>>> public void setup() throws IOException,TimeoutException >>>>>> public void teardown() throws IOException,TimeoutException >>>>>> protected void runTest(final int testNum) throws IOException >>>>>> >>>>>> @Build jars >>>>>> cd apex-malhar/contrib/ >>>>>> mvn clean package -DskipTests >>>>>> >>>>>> cd apex-malhar/library/ >>>>>> mvn clean package -DskipTests >>>>>> copy packages to project directory >>>>>> >>>>>> @Link them to the project >>>>>> Add following lines to the pom.xml >>>>>> <dependency> >>>>>> <groupId>contrib</groupId> >>>>>> <artifactId>com.datatorrent.contrib.helper</artifactId> >>>>>> <version>1.0</version> >>>>>> <scope>system</scope> >>>>>> >>>>>> >>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>lib</groupId> >>>>>> <artifactId>com.datatorrent.lib.helper</artifactId> >>>>>> <version>1.0</version> >>>>>> <scope>system</scope> >>>>>> >>>>>> >>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>contrib</groupId> >>>>>> <artifactId>com.datatorrent.contrib.rabbitmq</artifactId> >>>>>> <version>1.0</version> >>>>>> <scope>system</scope> >>>>>> >>>>>> >>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>Attribute</groupId> >>>>>> >>>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId> >>>>>> <version>1.0</version> >>>>>> <scope>system</scope> >>>>>> >>>>>> >>>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath> >>>>>> </dependency> >>>>>> >>>>>> >>>>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: >>>>>> >>>>>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper >>>>>> are >>>>>> under the test directory under malhar-contrib and malhar-library >>>>>> respectively. You may need to build these jars yourself with test >>>>>> scope to >>>>>> include these packages. >>>>>> >>>>>> On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> <mailto:a...@x5h.eu> >>>>>> wrote: >>>>>>> Hello, (mea culpa for messing up the headline the first time) >>>>>>> >>>>>>> I'm currently trying to get the apex-malhar rabbitmq running. But >>>>>>> I'm at a >>>>>>> complete loss, while the examples are running fine I don't even get >>>>>>> the >>>>>>> RabbitMQInputOperatorTest.java to run. >>>>>>> >>>>>>> First it couldn't find the rabbitmq-client which was solveable by >>>>>>> adding >>>>>>> the dependency: >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>com.rabbitmq</groupId> >>>>>>> <artifactId>amqp-client</artifactId> >>>>>>> <version>4.1.0</version> >>>>>>> </dependency> >>>>>>> >>>>>>> But now it doesn't find the packages com.datatorrent.contrib.helper >>>>>>> and >>>>>>> com.datatorrent.lib.helper and can't find several symbols. >>>>>>> >>>>>>> Needless to say that I'm a beginner regarding Apex so does anyone >>>>>>> know >>>>>>> what exactly I'm doing wrong here? >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> Manfred. >>>>>>> >>>>>>> >>>> >>>> >>> >> >