Sorry the two Snippets below where from different iterations. The Error I get is the following:
[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol [ERROR] symbol: variable output [ERROR] location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator My Code is as follows: package com.example.rabbitMQ; import org.apache.hadoop.conf.Configuration; import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG; import com.datatorrent.lib.io.jms.JMSStringInputOperator; @ApplicationAnnotation(name="RabbitMQ2HDFS") public class RabbitMQApplication implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) { RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class); rabbitInput.setHost("localhost"); rabbitInput.setPort(5672); rabbitInput.setExchange(""); rabbitInput.setQueueName("hello"); LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator()); dag.addStream("data", rabbitInput.output, out.input); } } Cheers Manfred. Am 08.06.2017 um 04:34 schrieb vikram patil: > Hi, > dag.addStream() is actually used to create stream of from one Operator output > port to other operators input port. > RabbitMQInputOperator consumer = dag.addOperator("Consumer", > RabbitMQInputOperator.class); > dag.addStream("data", *rabbitInput*.output, out.input); > Looks like your operator name is incorrect? I see in your code snippet > above, name of of RabbiMQInputOperator is *"Consumer".* > > In property name, you need to provide operator name you have specified > in addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class) > api call. > > dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct > correct given your operator name is correct ). > > ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your > code snippet ). > > I think tests which are provided in the Apache Malhar are very > detailed, they run in local mode as unit tests so we have mocked > actual rabbitmq by custom message publisher. > > For timebeing you set only queuename and hostname as > > // set your rabbitmq host . > consumer.setHost("localhost"); // set your rabbitmq port > consumer.setPort(5672) // It depends on your rabbitmq producer > configuration but by default // its default exchange with "" ( No Name > is provided ). consumer.setExchange(""); // set your queue name > consumer.setQueueName("YOUR_QUEUE_NAME") > > > > > If its okay, could you please share code from your application.java > and properties.xml here? > > Thanks, > Vikram > > > On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu <mailto:a...@x5h.eu>> wrote: > > Thanks very much for the help. The only problem left is that I > don't quite understand dag.addstream(). > > I tried this > > RabbitMQInputOperator consumer = dag.addOperator("Consumer", > RabbitMQInputOperator.class); > dag.addStream("data", rabbitInput.output, out.input); > > but obviously this doesn't work. What I don't get is the > difference between the ActiveMQ example and the RabbitMQ example. > I looked over the test examples for RabbitMQ but don't seem to > understand the logic behind it. > > Is this the correct wax to specify properties: > <property> > <name>dt.operator.rabbitMQIn.prop.tuple_blast</name> > <value>500</value> > </property> > > Cheers > Manfred. > > > Am 07.06.2017 um 12:03 schrieb Vikram Patil: >> Yes, you would need Application.java which will be way to define a DAG >> for Apex Application. >> >> Please have look at the code from following example to find out how to >> write JMS ActiveMQ based example: >> >> >> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ >> >> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ> >> >> This is how you can instantiate RabbitMQINputOperator and to a dag. >> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >> RabbitMQInputOperator.class); >> >> >> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag >> >> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag> >> >> Following properties need to be specified in properties.xml >> >> * Properties:<br> >> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br> >> * <b>bufferSize</b>: Size of holding buffer<br> >> * <b>host</b>:the address for the consumer to connect to rabbitMQ >> producer<br> >> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ >> producer<br> >> * <b>exchangeType</b>:the exchangeType for the consumer to connect to >> rabbitMQ producer<br> >> * <b>routingKey</b>:the routingKey for the consumer to connect to >> rabbitMQ producer<br> >> * <b>queueName</b>:the queueName for the consumer to connect to >> rabbitMQ producer<br> >> * <br> >> >> Reference: >> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java >> >> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java> >> >> Thanks, >> Vikram >> >> On Wed, Jun 7, 2017 at 3:19 PM, <a...@x5h.eu> <mailto:a...@x5h.eu> >> wrote: >>> Hello, >>> >>> I compiled the whole thing but now I don't know exactly how to get it >>> running in Apex. Do I need an application.java like in the tutorial? I >>> do >>> have a simple RabbitMQ queue up and running on the server. How do I >>> consume >>> the messages with Apex and write them to hdfs? >>> >>> Cheers, >>> >>> Manfred >>> >>> Following steps were necessary to get the RabbitMq test to compile >>> >>> @TimeoutException >>> import java.util.concurrent.TimeoutException; >>> public void setup() throws IOException,TimeoutException >>> public void teardown() throws IOException,TimeoutException >>> protected void runTest(final int testNum) throws IOException >>> >>> @Build jars >>> cd apex-malhar/contrib/ >>> mvn clean package -DskipTests >>> >>> cd apex-malhar/library/ >>> mvn clean package -DskipTests >>> copy packages to project directory >>> >>> @Link them to the project >>> Add following lines to the pom.xml >>> <dependency> >>> <groupId>contrib</groupId> >>> <artifactId>com.datatorrent.contrib.helper</artifactId> >>> <version>1.0</version> >>> <scope>system</scope> >>> >>> >>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath> >>> </dependency> >>> <dependency> >>> <groupId>lib</groupId> >>> <artifactId>com.datatorrent.lib.helper</artifactId> >>> <version>1.0</version> >>> <scope>system</scope> >>> >>> >>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath> >>> </dependency> >>> <dependency> >>> <groupId>contrib</groupId> >>> <artifactId>com.datatorrent.contrib.rabbitmq</artifactId> >>> <version>1.0</version> >>> <scope>system</scope> >>> >>> >>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath> >>> </dependency> >>> <dependency> >>> <groupId>Attribute</groupId> >>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId> >>> <version>1.0</version> >>> <scope>system</scope> >>> >>> >>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath> >>> </dependency> >>> >>> >>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: >>> >>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper are >>> under the test directory under malhar-contrib and malhar-library >>> respectively. You may need to build these jars yourself with test scope >>> to >>> include these packages. >>> >>> On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> <mailto:a...@x5h.eu> >>> wrote: >>>> Hello, (mea culpa for messing up the headline the first time) >>>> >>>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm >>>> at a >>>> complete loss, while the examples are running fine I don't even get the >>>> RabbitMQInputOperatorTest.java to run. >>>> >>>> First it couldn't find the rabbitmq-client which was solveable by >>>> adding >>>> the dependency: >>>> >>>> <dependency> >>>> <groupId>com.rabbitmq</groupId> >>>> <artifactId>amqp-client</artifactId> >>>> <version>4.1.0</version> >>>> </dependency> >>>> >>>> But now it doesn't find the packages com.datatorrent.contrib.helper and >>>> com.datatorrent.lib.helper and can't find several symbols. >>>> >>>> Needless to say that I'm a beginner regarding Apex so does anyone know >>>> what exactly I'm doing wrong here? >>>> >>>> Cheers >>>> >>>> Manfred. >>>> >>>> > >