Okay i found the error, I copied the LineOutputOperator.java <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java> from the jmsActiveMQ example and found there public class LineOutputOperator extends AbstractFileOutputOperator<String>
Instead i took the LineOutputOperator.java <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator So far so good now it compiles without errors. Cheers Manfred Am 08.06.2017 um 17:38 schrieb a...@x5h.eu: > > I still don't get it completely: (The rest of the code is in the Email > before, this is only the necessary sample) > > 1. dag.addStream("test", rabbitInput.output, out.input); > Results in the following error: > [ERROR] symbol: variable output > [ERROR] location: variable rabbitInput of type > com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator > > 2. dag.addStream("test", rabbitInput.outputPort, out.input); > Results in the following error: > [ERROR] > > /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] > no suitable method found for > > addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>) > [ERROR] method > > com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? > extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) > is not applicable > [ERROR] (inferred type does not conform to upper bound(s) > [ERROR] inferred: byte[] > [ERROR] upper bound(s): java.lang.String,java.lang.Object) > [ERROR] method > > com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? > extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is > not applicable > [ERROR] (inferred type does not conform to upper bound(s) > [ERROR] inferred: byte[] > [ERROR] upper bound(s): java.lang.String,java.lang.Object) > [ERROR] method > > com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? > extends T>,com.datatorrent.api.Operator.InputPort<? super > T>,com.datatorrent.api.Operator.InputPort<? super T>) is not > applicable > [ERROR] (cannot infer type-variable(s) T > [ERROR] (actual and formal argument lists differ in length)) > > > > It seems that on the one hand the RabbitMQInputOperator.class does not > have an output method and on the other hand the addStream method only > accepts outputPort combined with inputPort methods or output and input > methods of the corresponding classes. Does that mean I only can use a > class that implements inputPort method for this example? > > Cheers > > Manfred. > > > > Am 08.06.2017 um 10:05 schrieb a...@x5h.eu: >> >> Sorry the two Snippets below where from different iterations. The >> Error I get is the following: >> >> [ERROR] >> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] >> cannot find symbol >> [ERROR] symbol: variable output >> [ERROR] location: variable rabbitInput of type >> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >> >> My Code is as follows: >> >> >> package com.example.rabbitMQ; >> >> import org.apache.hadoop.conf.Configuration; >> >> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; >> import com.datatorrent.api.annotation.ApplicationAnnotation; >> import com.datatorrent.api.StreamingApplication; >> import com.datatorrent.api.DAG; >> import com.datatorrent.lib.io.jms.JMSStringInputOperator; >> >> @ApplicationAnnotation(name="RabbitMQ2HDFS") >> public class RabbitMQApplication implements StreamingApplication >> { >> >> @Override >> public void populateDAG(DAG dag, Configuration conf) >> { >> >> RabbitMQInputOperator rabbitInput = >> dag.addOperator("Consumer",RabbitMQInputOperator.class); >> rabbitInput.setHost("localhost"); >> rabbitInput.setPort(5672); >> rabbitInput.setExchange(""); >> rabbitInput.setQueueName("hello"); >> LineOutputOperator out = dag.addOperator("fileOut", new >> LineOutputOperator()); >> >> dag.addStream("data", rabbitInput.output, out.input); >> } >> } >> >> Cheers >> >> Manfred. >> >> >> >> Am 08.06.2017 um 04:34 schrieb vikram patil: >>> Hi, >>> dag.addStream() is actually used to create stream of from one Operator >>> output port to other operators input port. >>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>> RabbitMQInputOperator.class); >>> dag.addStream("data", *rabbitInput*.output, out.input); >>> Looks like your operator name is incorrect? I see in your code >>> snippet above, name of of RabbiMQInputOperator is *"Consumer".* >>> >>> In property name, you need to provide operator name you have >>> specified in addOperator(*"NAME OF THE OPERATOR"*, >>> RabbitMQInputOperator.class) api call. >>> >>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct >>> correct given your operator name is correct ). >>> >>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your >>> code snippet ). >>> >>> I think tests which are provided in the Apache Malhar are very >>> detailed, they run in local mode as unit tests so we have mocked >>> actual rabbitmq by custom message publisher. >>> >>> For timebeing you set only queuename and hostname as >>> >>> // set your rabbitmq host . >>> consumer.setHost("localhost"); // set your rabbitmq port >>> consumer.setPort(5672) // It depends on your rabbitmq producer >>> configuration but by default // its default exchange with "" ( No >>> Name is provided ). consumer.setExchange(""); // set your queue name >>> consumer.setQueueName("YOUR_QUEUE_NAME") >>> >>> >>> >>> >>> If its okay, could you please share code from your application.java >>> and properties.xml here? >>> >>> Thanks, >>> Vikram >>> >>> >>> On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu <mailto:a...@x5h.eu>> >>> wrote: >>> >>> Thanks very much for the help. The only problem left is that I >>> don't quite understand dag.addstream(). >>> >>> I tried this >>> >>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>> RabbitMQInputOperator.class); >>> dag.addStream("data", rabbitInput.output, out.input); >>> >>> but obviously this doesn't work. What I don't get is the >>> difference between the ActiveMQ example and the RabbitMQ >>> example. I looked over the test examples for RabbitMQ but don't >>> seem to understand the logic behind it. >>> >>> Is this the correct wax to specify properties: >>> <property> >>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name> >>> <value>500</value> >>> </property> >>> >>> Cheers >>> Manfred. >>> >>> >>> Am 07.06.2017 um 12:03 schrieb Vikram Patil: >>>> Yes, you would need Application.java which will be way to define a DAG >>>> for Apex Application. >>>> >>>> Please have look at the code from following example to find out how to >>>> write JMS ActiveMQ based example: >>>> >>>> >>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ >>>> >>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ> >>>> >>>> This is how you can instantiate RabbitMQINputOperator and to a dag. >>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>> RabbitMQInputOperator.class); >>>> >>>> >>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag >>>> >>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag> >>>> >>>> Following properties need to be specified in properties.xml >>>> >>>> * Properties:<br> >>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br> >>>> * <b>bufferSize</b>: Size of holding buffer<br> >>>> * <b>host</b>:the address for the consumer to connect to rabbitMQ >>>> producer<br> >>>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ >>>> producer<br> >>>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to >>>> rabbitMQ producer<br> >>>> * <b>routingKey</b>:the routingKey for the consumer to connect to >>>> rabbitMQ producer<br> >>>> * <b>queueName</b>:the queueName for the consumer to connect to >>>> rabbitMQ producer<br> >>>> * <br> >>>> >>>> Reference: >>>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java >>>> >>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java> >>>> >>>> Thanks, >>>> Vikram >>>> >>>> On Wed, Jun 7, 2017 at 3:19 PM, <a...@x5h.eu> <mailto:a...@x5h.eu> >>>> wrote: >>>>> Hello, >>>>> >>>>> I compiled the whole thing but now I don't know exactly how to get it >>>>> running in Apex. Do I need an application.java like in the tutorial? >>>>> I do >>>>> have a simple RabbitMQ queue up and running on the server. How do I >>>>> consume >>>>> the messages with Apex and write them to hdfs? >>>>> >>>>> Cheers, >>>>> >>>>> Manfred >>>>> >>>>> Following steps were necessary to get the RabbitMq test to compile >>>>> >>>>> @TimeoutException >>>>> import java.util.concurrent.TimeoutException; >>>>> public void setup() throws IOException,TimeoutException >>>>> public void teardown() throws IOException,TimeoutException >>>>> protected void runTest(final int testNum) throws IOException >>>>> >>>>> @Build jars >>>>> cd apex-malhar/contrib/ >>>>> mvn clean package -DskipTests >>>>> >>>>> cd apex-malhar/library/ >>>>> mvn clean package -DskipTests >>>>> copy packages to project directory >>>>> >>>>> @Link them to the project >>>>> Add following lines to the pom.xml >>>>> <dependency> >>>>> <groupId>contrib</groupId> >>>>> <artifactId>com.datatorrent.contrib.helper</artifactId> >>>>> <version>1.0</version> >>>>> <scope>system</scope> >>>>> >>>>> >>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>lib</groupId> >>>>> <artifactId>com.datatorrent.lib.helper</artifactId> >>>>> <version>1.0</version> >>>>> <scope>system</scope> >>>>> >>>>> >>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>contrib</groupId> >>>>> <artifactId>com.datatorrent.contrib.rabbitmq</artifactId> >>>>> <version>1.0</version> >>>>> <scope>system</scope> >>>>> >>>>> >>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>Attribute</groupId> >>>>> >>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId> >>>>> <version>1.0</version> >>>>> <scope>system</scope> >>>>> >>>>> >>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath> >>>>> </dependency> >>>>> >>>>> >>>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: >>>>> >>>>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper >>>>> are >>>>> under the test directory under malhar-contrib and malhar-library >>>>> respectively. You may need to build these jars yourself with test >>>>> scope to >>>>> include these packages. >>>>> >>>>> On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> <mailto:a...@x5h.eu> >>>>> wrote: >>>>>> Hello, (mea culpa for messing up the headline the first time) >>>>>> >>>>>> I'm currently trying to get the apex-malhar rabbitmq running. But >>>>>> I'm at a >>>>>> complete loss, while the examples are running fine I don't even get >>>>>> the >>>>>> RabbitMQInputOperatorTest.java to run. >>>>>> >>>>>> First it couldn't find the rabbitmq-client which was solveable by >>>>>> adding >>>>>> the dependency: >>>>>> >>>>>> <dependency> >>>>>> <groupId>com.rabbitmq</groupId> >>>>>> <artifactId>amqp-client</artifactId> >>>>>> <version>4.1.0</version> >>>>>> </dependency> >>>>>> >>>>>> But now it doesn't find the packages com.datatorrent.contrib.helper >>>>>> and >>>>>> com.datatorrent.lib.helper and can't find several symbols. >>>>>> >>>>>> Needless to say that I'm a beginner regarding Apex so does anyone >>>>>> know >>>>>> what exactly I'm doing wrong here? >>>>>> >>>>>> Cheers >>>>>> >>>>>> Manfred. >>>>>> >>>>>> >>> >>> >> >