Hello, @2.) Does it have to be a durable queue or does an exchange suffice?
Cheers Manfred Am 09.06.2017 um 15:23 schrieb vikram patil: > 1) Are you doing it on your local environment? > 2) If you are doing it locally I would suggest following options > 1) If you dont want to create queue on rabbitmq by yourself . Set > queuename on operator > in.setQueueName("YOUR_QUEUE_NAME" ) > Operator will do following steps : > * Create Durable Queue in RabbitMQ > * You have specfied exchange and exchangeType . > So it will create an exchange using this information > and bind created queue with exchange with default routing key which > will be "". > Right now it must be creating auto generated unique named > queue for you. > > 2) You can create your own exchange and durable queue using > rabbitmq admin . You will have to install rabbitmq plugins for that. > You can use it to publish some test data as well. > > Using apex-cli you can check status of your application, if its > failing then you should check logs from userlogs in hadoop logs directory. > > Thanks & Regards, > Vikram > > > > > On Fri, Jun 9, 2017 at 6:42 PM, <a...@x5h.eu <mailto:a...@x5h.eu>> wrote: > > Finally got rid of all errors but now I have the problem that the > apex application does not seem to register at the RabbitMQ exchange. > > This is my code: > > @ApplicationAnnotation(name="RabbitMQ2HDFS") > public class RabbitMQApplication implements StreamingApplication > { > > @Override > public void populateDAG(DAG dag, Configuration conf) > { > RabbitMQInputOperator in = dag.addOperator("rabbitInput",new > RabbitMQInputOperator()); > in.setHost("localhost"); > //in.setPort(5672); > in.setExchange("apex"); > in.setExchangeType("fanout"); > ConsoleOutputOperator console = dag.addOperator("console", new > ConsoleOutputOperator()); > dag.addStream("rand_console",in.outputPort, console.input); > > } > > } > > If I launch the application everthing executes without an error > but if i list the bindings on the exchange, there is none. > > Anyone even an idea how i can start to debug this? > > Cheers > Manfred. > > > > Am 08.06.2017 um 18:04 schrieb a...@x5h.eu <mailto:a...@x5h.eu>: >> >> Okay i found the error, I copied the LineOutputOperator.java >> >> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java> >> from the jmsActiveMQ example and found there >> public class LineOutputOperator extends >> AbstractFileOutputOperator<String> >> >> Instead i took the LineOutputOperator.java >> >> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from >> the Kafka 0.9 example there the class is correctly defined for >> the RabbitMQInputOperator >> >> So far so good now it compiles without errors. >> >> Cheers >> >> Manfred >> >> Am 08.06.2017 um 17:38 schrieb a...@x5h.eu <mailto:a...@x5h.eu>: >>> >>> I still don't get it completely: (The rest of the code is in the >>> Email before, this is only the necessary sample) >>> >>> 1. dag.addStream("test", rabbitInput.output, out.input); >>> Results in the following error: >>> [ERROR] symbol: variable output >>> [ERROR] location: variable rabbitInput of type >>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >>> >>> 2. dag.addStream("test", rabbitInput.outputPort, out.input); >>> Results in the following error: >>> [ERROR] >>> >>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] >>> no suitable method found for >>> >>> addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>) >>> [ERROR] method >>> >>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >>> extends T>,com.datatorrent.api.Operator.InputPort<? super >>> T>...) is not applicable >>> [ERROR] (inferred type does not conform to upper bound(s) >>> [ERROR] inferred: byte[] >>> [ERROR] upper bound(s): >>> java.lang.String,java.lang.Object) >>> [ERROR] method >>> >>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >>> extends T>,com.datatorrent.api.Operator.InputPort<? super >>> T>) is not applicable >>> [ERROR] (inferred type does not conform to upper bound(s) >>> [ERROR] inferred: byte[] >>> [ERROR] upper bound(s): >>> java.lang.String,java.lang.Object) >>> [ERROR] method >>> >>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >>> extends T>,com.datatorrent.api.Operator.InputPort<? super >>> T>,com.datatorrent.api.Operator.InputPort<? super T>) is not >>> applicable >>> [ERROR] (cannot infer type-variable(s) T >>> [ERROR] (actual and formal argument lists differ in >>> length)) >>> >>> >>> >>> It seems that on the one hand the RabbitMQInputOperator.class >>> does not have an output method and on the other hand the >>> addStream method only accepts outputPort combined with inputPort >>> methods or output and input methods of the corresponding >>> classes. Does that mean I only can use a class that implements >>> inputPort method for this example? >>> >>> Cheers >>> >>> Manfred. >>> >>> >>> >>> Am 08.06.2017 um 10:05 schrieb a...@x5h.eu <mailto:a...@x5h.eu>: >>>> >>>> Sorry the two Snippets below where from different iterations. >>>> The Error I get is the following: >>>> >>>> [ERROR] >>>> >>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] >>>> cannot find symbol >>>> [ERROR] symbol: variable output >>>> [ERROR] location: variable rabbitInput of type >>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >>>> >>>> My Code is as follows: >>>> >>>> >>>> package com.example.rabbitMQ; >>>> >>>> import org.apache.hadoop.conf.Configuration; >>>> >>>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; >>>> import com.datatorrent.api.annotation.ApplicationAnnotation; >>>> import com.datatorrent.api.StreamingApplication; >>>> import com.datatorrent.api.DAG; >>>> import com.datatorrent.lib.io.jms.JMSStringInputOperator; >>>> >>>> @ApplicationAnnotation(name="RabbitMQ2HDFS") >>>> public class RabbitMQApplication implements StreamingApplication >>>> { >>>> >>>> @Override >>>> public void populateDAG(DAG dag, Configuration conf) >>>> { >>>> >>>> RabbitMQInputOperator rabbitInput = >>>> dag.addOperator("Consumer",RabbitMQInputOperator.class); >>>> rabbitInput.setHost("localhost"); >>>> rabbitInput.setPort(5672); >>>> rabbitInput.setExchange(""); >>>> rabbitInput.setQueueName("hello"); >>>> LineOutputOperator out = dag.addOperator("fileOut", new >>>> LineOutputOperator()); >>>> >>>> dag.addStream("data", rabbitInput.output, out.input); >>>> } >>>> } >>>> >>>> Cheers >>>> >>>> Manfred. >>>> >>>> >>>> >>>> Am 08.06.2017 um 04:34 schrieb vikram patil: >>>>> Hi, >>>>> dag.addStream() is actually used to create stream of from one >>>>> Operator output port to other operators input port. >>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>>> RabbitMQInputOperator.class); >>>>> dag.addStream("data", *rabbitInput*.output, out.input); >>>>> Looks like your operator name is incorrect? I see in your code >>>>> snippet above, name of of RabbiMQInputOperator is *"Consumer".* >>>>> >>>>> In property name, you need to provide operator name you have >>>>> specified in addOperator(*"NAME OF THE OPERATOR"*, >>>>> RabbitMQInputOperator.class) api call. >>>>> >>>>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is >>>>> correct correct given your operator name is correct ). >>>>> >>>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based >>>>> on your code snippet ). >>>>> >>>>> I think tests which are provided in the Apache Malhar are very >>>>> detailed, they run in local mode as unit tests so we have >>>>> mocked actual rabbitmq by custom message publisher. >>>>> >>>>> For timebeing you set only queuename and hostname as >>>>> >>>>> // set your rabbitmq host . >>>>> consumer.setHost("localhost"); // set your rabbitmq port >>>>> consumer.setPort(5672) // It depends on your rabbitmq producer >>>>> configuration but by default // its default exchange with "" ( >>>>> No Name is provided ). consumer.setExchange(""); // set your >>>>> queue name consumer.setQueueName("YOUR_QUEUE_NAME") >>>>> >>>>> >>>>> >>>>> >>>>> If its okay, could you please share code from your >>>>> application.java and properties.xml here? >>>>> >>>>> Thanks, >>>>> Vikram >>>>> >>>>> >>>>> On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu >>>>> <mailto:a...@x5h.eu>> wrote: >>>>> >>>>> Thanks very much for the help. The only problem left is >>>>> that I don't quite understand dag.addstream(). >>>>> >>>>> I tried this >>>>> >>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>>> RabbitMQInputOperator.class); >>>>> dag.addStream("data", rabbitInput.output, out.input); >>>>> >>>>> but obviously this doesn't work. What I don't get is the >>>>> difference between the ActiveMQ example and the RabbitMQ >>>>> example. I looked over the test examples for RabbitMQ but >>>>> don't seem to understand the logic behind it. >>>>> >>>>> Is this the correct wax to specify properties: >>>>> <property> >>>>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name> >>>>> <value>500</value> >>>>> </property> >>>>> >>>>> Cheers >>>>> Manfred. >>>>> >>>>> >>>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil: >>>>>> Yes, you would need Application.java which will be way to define >>>>>> a DAG >>>>>> for Apex Application. >>>>>> >>>>>> Please have look at the code from following example to find out >>>>>> how to >>>>>> write JMS ActiveMQ based example: >>>>>> >>>>>> >>>>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ >>>>>> >>>>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ> >>>>>> >>>>>> This is how you can instantiate RabbitMQINputOperator and to a >>>>>> dag. >>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>>>> RabbitMQInputOperator.class); >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag >>>>>> >>>>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag> >>>>>> >>>>>> Following properties need to be specified in properties.xml >>>>>> >>>>>> * Properties:<br> >>>>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br> >>>>>> * <b>bufferSize</b>: Size of holding buffer<br> >>>>>> * <b>host</b>:the address for the consumer to connect to >>>>>> rabbitMQ producer<br> >>>>>> * <b>exchange</b>:the exchange for the consumer to connect to >>>>>> rabbitMQ >>>>>> producer<br> >>>>>> * <b>exchangeType</b>:the exchangeType for the consumer to >>>>>> connect to >>>>>> rabbitMQ producer<br> >>>>>> * <b>routingKey</b>:the routingKey for the consumer to connect to >>>>>> rabbitMQ producer<br> >>>>>> * <b>queueName</b>:the queueName for the consumer to connect to >>>>>> rabbitMQ producer<br> >>>>>> * <br> >>>>>> >>>>>> Reference: >>>>>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java >>>>>> >>>>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java> >>>>>> >>>>>> Thanks, >>>>>> Vikram >>>>>> >>>>>> On Wed, Jun 7, 2017 at 3:19 PM, <a...@x5h.eu> >>>>>> <mailto:a...@x5h.eu> wrote: >>>>>>> Hello, >>>>>>> >>>>>>> I compiled the whole thing but now I don't know exactly how to >>>>>>> get it >>>>>>> running in Apex. Do I need an application.java like in the >>>>>>> tutorial? I do >>>>>>> have a simple RabbitMQ queue up and running on the server. How >>>>>>> do I consume >>>>>>> the messages with Apex and write them to hdfs? >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Manfred >>>>>>> >>>>>>> Following steps were necessary to get the RabbitMq test to >>>>>>> compile >>>>>>> >>>>>>> @TimeoutException >>>>>>> import java.util.concurrent.TimeoutException; >>>>>>> public void setup() throws IOException,TimeoutException >>>>>>> public void teardown() throws IOException,TimeoutException >>>>>>> protected void runTest(final int testNum) throws IOException >>>>>>> >>>>>>> @Build jars >>>>>>> cd apex-malhar/contrib/ >>>>>>> mvn clean package -DskipTests >>>>>>> >>>>>>> cd apex-malhar/library/ >>>>>>> mvn clean package -DskipTests >>>>>>> copy packages to project directory >>>>>>> >>>>>>> @Link them to the project >>>>>>> Add following lines to the pom.xml >>>>>>> <dependency> >>>>>>> <groupId>contrib</groupId> >>>>>>> <artifactId>com.datatorrent.co >>>>>>> <http://com.datatorrent.co>ntrib.helper</artifactId> >>>>>>> <version>1.0</version> >>>>>>> <scope>system</scope> >>>>>>> >>>>>>> >>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>lib</groupId> >>>>>>> <artifactId>com.datatorrent.li >>>>>>> <http://com.datatorrent.li>b.helper</artifactId> >>>>>>> <version>1.0</version> >>>>>>> <scope>system</scope> >>>>>>> >>>>>>> >>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>contrib</groupId> >>>>>>> <artifactId>com.datatorrent.co >>>>>>> <http://com.datatorrent.co>ntrib.rabbitmq</artifactId> >>>>>>> <version>1.0</version> >>>>>>> <scope>system</scope> >>>>>>> >>>>>>> >>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>Attribute</groupId> >>>>>>> >>>>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId> >>>>>>> <version>1.0</version> >>>>>>> <scope>system</scope> >>>>>>> >>>>>>> >>>>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath> >>>>>>> </dependency> >>>>>>> >>>>>>> >>>>>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: >>>>>>> >>>>>>> Both com.datatorrent.contrib.helper and >>>>>>> com.datatorrent.lib.helper are >>>>>>> under the test directory under malhar-contrib and malhar-library >>>>>>> respectively. You may need to build these jars yourself with >>>>>>> test scope to >>>>>>> include these packages. >>>>>>> >>>>>>> On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> >>>>>>> <mailto:a...@x5h.eu> wrote: >>>>>>>> Hello, (mea culpa for messing up the headline the first time) >>>>>>>> >>>>>>>> I'm currently trying to get the apex-malhar rabbitmq running. >>>>>>>> But I'm at a >>>>>>>> complete loss, while the examples are running fine I don't >>>>>>>> even get the >>>>>>>> RabbitMQInputOperatorTest.java to run. >>>>>>>> >>>>>>>> First it couldn't find the rabbitmq-client which was solveable >>>>>>>> by adding >>>>>>>> the dependency: >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>com.rabbitmq</groupId> >>>>>>>> <artifactId>amqp-client</artifactId> >>>>>>>> <version>4.1.0</version> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> But now it doesn't find the packages >>>>>>>> com.datatorrent.contrib.helper and >>>>>>>> com.datatorrent.lib.helper and can't find several symbols. >>>>>>>> >>>>>>>> Needless to say that I'm a beginner regarding Apex so does >>>>>>>> anyone know >>>>>>>> what exactly I'm doing wrong here? >>>>>>>> >>>>>>>> Cheers >>>>>>>> >>>>>>>> Manfred. >>>>>>>> >>>>>>>> >>>>> >>>>> >>>> >>> >> > >