After various tests I finally got it all working nicely and for future users I'll post here how.
First the rabbitMQ configuration that was the only working one: rabbitmqadmin declare exchange name=apex type=fanout durable=false rabbitmqadmin declare queue name=test durable=true rabbitmqadmin binding source="apex" destination_type="queue" destination="test" routing_key="rktest" It is important that apex only accepts a non-durable exchange. But this means you have to recreate it everytime you restart your RabbitMQ service. The "Mkdirs failed to create" error: This just means that the DFS service is down or in my case the safemode is on. hdfs dfsadmin -safemode get hdfs dfsadmin -safemode enter My example uses the following (I moved the operator values in a corresponding *.xml file I just listed them here for better understanding): @ApplicationAnnotation(name="RabbitMQ2HDFS") public class RabbitMQApplication implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) { RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator()); in.setHost("192.168.33.63"); in.setExchange("apex"); in.setExchangeType("fanout"); in.setQueueName("test"); LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator()); out.setFilePath("/hdfs/rabbitMQ"); out.setBaseName("rabbitOut"); out.setMaxLength(1024); out.setRotationWindows(4); dag.addStream("data", in.outputPort, out.input); } } And the corresponding Output Operator. The only important thing here was that it extends the byte AbstractFileOutputOperator public class LineOutputOperator extends AbstractFileOutputOperator<byte[]> { private static final String NL = System.lineSeparator(); private static final Charset CS = StandardCharsets.UTF_8; @NotNull private String baseName; @Override public byte[] getBytesForTuple(byte[] t) { String result = new String(t, CS) + NL; return result.getBytes(CS); } @Override protected String getFileName(byte[] tuple) { return baseName; } public String getBaseName() { return baseName; } public void setBaseName(String v) { baseName = v; } } The most pressing issue was that it won't run on the yarn cluster only in local mode. I still have no idea why it diden't run but my best guess is that it was a bad idea in the beginning to try the apex app in a Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with 8GB RAM and without changing a thing in the application it worked perfectly. Thanks for all the help! Am 22.06.2017 um 11:33 schrieb a...@x5h.eu: > > I drilled the error down to this message: > > Mkdirs failed to create > file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2 > > I guess i have something buggy in my configuration does any of you > know how to solve this error? Should I start the application with the > same user I'm starting yarn? > > Cheers > > Manfred. > > > > Am 10.06.2017 um 14:50 schrieb a...@x5h.eu: >> >> Hello, >> >> you were completely right it seems that there are problems with my >> test scenario regarding the hadoop, yarn installation and the >> application never starts. I found this entries in the log: >> >> 2017-06-10 14:33:02,623 INFO >> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: >> Registering app attempt : appattempt_1495629011552_0011_000001 >> 2017-06-10 14:33:02,623 INFO >> org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: >> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED >> 2017-06-10 14:33:02,624 INFO >> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: >> not starting application as amIfStarted exceeds amLimit >> 2017-06-10 14:33:02,624 INFO >> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: >> Application added - appId: application_1495629011552_0011 user: >> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, >> leaf-queue: default #user-pending-applications: 1 >> #user-active-applications: 1 #queue-pending-applications: 1 >> #queue-active-applications: 1 >> >> Therefore the application never leaves the state "undefined". Since >> the local tests were running fine and the launch of the application >> didn't raise an error I missed the problem with the hadoop >> installation. Thanks for the correct hint to look at the hadoop cluster. >> >> Cheers >> Manfred. >> >> >> Am 09.06.2017 um 15:23 schrieb vikram patil: >>> 1) Are you doing it on your local environment? >>> 2) If you are doing it locally I would suggest following options >>> 1) If you dont want to create queue on rabbitmq by yourself . >>> Set queuename on operator >>> in.setQueueName("YOUR_QUEUE_NAME" ) >>> Operator will do following steps : >>> * Create Durable Queue in RabbitMQ >>> * You have specfied exchange and exchangeType . >>> So it will create an exchange using this >>> information and bind created queue with exchange with default >>> routing key which will be "". >>> Right now it must be creating auto generated unique named >>> queue for you. >>> >>> 2) You can create your own exchange and durable queue using >>> rabbitmq admin . You will have to install rabbitmq plugins for that. >>> You can use it to publish some test data as well. >>> >>> Using apex-cli you can check status of your application, if its >>> failing then you should check logs from userlogs in hadoop logs >>> directory. >>> >>> Thanks & Regards, >>> Vikram >>> >>> >>> >>> >>> On Fri, Jun 9, 2017 at 6:42 PM, <a...@x5h.eu <mailto:a...@x5h.eu>> >>> wrote: >>> >>> Finally got rid of all errors but now I have the problem that >>> the apex application does not seem to register at the RabbitMQ >>> exchange. >>> >>> This is my code: >>> >>> @ApplicationAnnotation(name="RabbitMQ2HDFS") >>> public class RabbitMQApplication implements StreamingApplication >>> { >>> >>> @Override >>> public void populateDAG(DAG dag, Configuration conf) >>> { >>> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new >>> RabbitMQInputOperator()); >>> in.setHost("localhost"); >>> //in.setPort(5672); >>> in.setExchange("apex"); >>> in.setExchangeType("fanout"); >>> ConsoleOutputOperator console = dag.addOperator("console", >>> new ConsoleOutputOperator()); >>> dag.addStream("rand_console",in.outputPort, console.input); >>> >>> } >>> >>> } >>> >>> If I launch the application everthing executes without an error >>> but if i list the bindings on the exchange, there is none. >>> >>> Anyone even an idea how i can start to debug this? >>> >>> Cheers >>> Manfred. >>> >>> >>> >>> Am 08.06.2017 um 18:04 schrieb a...@x5h.eu <mailto:a...@x5h.eu>: >>>> >>>> Okay i found the error, I copied the LineOutputOperator.java >>>> >>>> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java> >>>> from the jmsActiveMQ example and found there >>>> public class LineOutputOperator extends >>>> AbstractFileOutputOperator<String> >>>> >>>> Instead i took the LineOutputOperator.java >>>> >>>> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from >>>> the Kafka 0.9 example there the class is correctly defined for >>>> the RabbitMQInputOperator >>>> >>>> So far so good now it compiles without errors. >>>> >>>> Cheers >>>> >>>> Manfred >>>> >>>> Am 08.06.2017 um 17:38 schrieb a...@x5h.eu <mailto:a...@x5h.eu>: >>>>> >>>>> I still don't get it completely: (The rest of the code is in >>>>> the Email before, this is only the necessary sample) >>>>> >>>>> 1. dag.addStream("test", rabbitInput.output, out.input); >>>>> Results in the following error: >>>>> [ERROR] symbol: variable output >>>>> [ERROR] location: variable rabbitInput of type >>>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >>>>> >>>>> 2. dag.addStream("test", rabbitInput.outputPort, out.input); >>>>> Results in the following error: >>>>> [ERROR] >>>>> >>>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] >>>>> no suitable method found for >>>>> >>>>> addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>) >>>>> [ERROR] method >>>>> >>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super >>>>> T>...) is not applicable >>>>> [ERROR] (inferred type does not conform to upper >>>>> bound(s) >>>>> [ERROR] inferred: byte[] >>>>> [ERROR] upper bound(s): >>>>> java.lang.String,java.lang.Object) >>>>> [ERROR] method >>>>> >>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super >>>>> T>) is not applicable >>>>> [ERROR] (inferred type does not conform to upper >>>>> bound(s) >>>>> [ERROR] inferred: byte[] >>>>> [ERROR] upper bound(s): >>>>> java.lang.String,java.lang.Object) >>>>> [ERROR] method >>>>> >>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super >>>>> T>,com.datatorrent.api.Operator.InputPort<? super T>) is >>>>> not applicable >>>>> [ERROR] (cannot infer type-variable(s) T >>>>> [ERROR] (actual and formal argument lists differ >>>>> in length)) >>>>> >>>>> >>>>> >>>>> It seems that on the one hand the RabbitMQInputOperator.class >>>>> does not have an output method and on the other hand the >>>>> addStream method only accepts outputPort combined with >>>>> inputPort methods or output and input methods of the >>>>> corresponding classes. Does that mean I only can use a class >>>>> that implements inputPort method for this example? >>>>> >>>>> Cheers >>>>> >>>>> Manfred. >>>>> >>>>> >>>>> >>>>> Am 08.06.2017 um 10:05 schrieb a...@x5h.eu <mailto:a...@x5h.eu>: >>>>>> >>>>>> Sorry the two Snippets below where from different iterations. >>>>>> The Error I get is the following: >>>>>> >>>>>> [ERROR] >>>>>> >>>>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] >>>>>> cannot find symbol >>>>>> [ERROR] symbol: variable output >>>>>> [ERROR] location: variable rabbitInput of type >>>>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >>>>>> >>>>>> My Code is as follows: >>>>>> >>>>>> >>>>>> package com.example.rabbitMQ; >>>>>> >>>>>> import org.apache.hadoop.conf.Configuration; >>>>>> >>>>>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; >>>>>> import com.datatorrent.api.annotation.ApplicationAnnotation; >>>>>> import com.datatorrent.api.StreamingApplication; >>>>>> import com.datatorrent.api.DAG; >>>>>> import com.datatorrent.lib.io.jms.JMSStringInputOperator; >>>>>> >>>>>> @ApplicationAnnotation(name="RabbitMQ2HDFS") >>>>>> public class RabbitMQApplication implements StreamingApplication >>>>>> { >>>>>> >>>>>> @Override >>>>>> public void populateDAG(DAG dag, Configuration conf) >>>>>> { >>>>>> >>>>>> RabbitMQInputOperator rabbitInput = >>>>>> dag.addOperator("Consumer",RabbitMQInputOperator.class); >>>>>> rabbitInput.setHost("localhost"); >>>>>> rabbitInput.setPort(5672); >>>>>> rabbitInput.setExchange(""); >>>>>> rabbitInput.setQueueName("hello"); >>>>>> LineOutputOperator out = dag.addOperator("fileOut", new >>>>>> LineOutputOperator()); >>>>>> >>>>>> dag.addStream("data", rabbitInput.output, out.input); >>>>>> } >>>>>> } >>>>>> >>>>>> Cheers >>>>>> >>>>>> Manfred. >>>>>> >>>>>> >>>>>> >>>>>> Am 08.06.2017 um 04:34 schrieb vikram patil: >>>>>>> Hi, >>>>>>> dag.addStream() is actually used to create stream of from one >>>>>>> Operator output port to other operators input port. >>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>>>>> RabbitMQInputOperator.class); >>>>>>> dag.addStream("data", *rabbitInput*.output, out.input); >>>>>>> Looks like your operator name is incorrect? I see in your >>>>>>> code snippet above, name of of RabbiMQInputOperator >>>>>>> is *"Consumer".* >>>>>>> >>>>>>> In property name, you need to provide operator name you have >>>>>>> specified in addOperator(*"NAME OF THE OPERATOR"*, >>>>>>> RabbitMQInputOperator.class) api call. >>>>>>> >>>>>>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is >>>>>>> correct correct given your operator name is correct ). >>>>>>> >>>>>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based >>>>>>> on your code snippet ). >>>>>>> >>>>>>> I think tests which are provided in the Apache Malhar are >>>>>>> very detailed, they run in local mode as unit tests so we >>>>>>> have mocked actual rabbitmq by custom message publisher. >>>>>>> >>>>>>> For timebeing you set only queuename and hostname as >>>>>>> >>>>>>> // set your rabbitmq host . >>>>>>> consumer.setHost("localhost"); // set your rabbitmq port >>>>>>> consumer.setPort(5672) // It depends on your rabbitmq >>>>>>> producer configuration but by default // its default >>>>>>> exchange with "" ( No Name is provided ). >>>>>>> consumer.setExchange(""); // set your queue name >>>>>>> consumer.setQueueName("YOUR_QUEUE_NAME") >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> If its okay, could you please share code from your >>>>>>> application.java and properties.xml here? >>>>>>> >>>>>>> Thanks, >>>>>>> Vikram >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu >>>>>>> <mailto:a...@x5h.eu>> wrote: >>>>>>> >>>>>>> Thanks very much for the help. The only problem left is >>>>>>> that I don't quite understand dag.addstream(). >>>>>>> >>>>>>> I tried this >>>>>>> >>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>>>>> RabbitMQInputOperator.class); >>>>>>> dag.addStream("data", rabbitInput.output, out.input); >>>>>>> >>>>>>> but obviously this doesn't work. What I don't get is the >>>>>>> difference between the ActiveMQ example and the RabbitMQ >>>>>>> example. I looked over the test examples for RabbitMQ >>>>>>> but don't seem to understand the logic behind it. >>>>>>> >>>>>>> Is this the correct wax to specify properties: >>>>>>> <property> >>>>>>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name> >>>>>>> <value>500</value> >>>>>>> </property> >>>>>>> >>>>>>> Cheers >>>>>>> Manfred. >>>>>>> >>>>>>> >>>>>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil: >>>>>>>> Yes, you would need Application.java which will be way to >>>>>>>> define a DAG >>>>>>>> for Apex Application. >>>>>>>> >>>>>>>> Please have look at the code from following example to find >>>>>>>> out how to >>>>>>>> write JMS ActiveMQ based example: >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ >>>>>>>> >>>>>>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ> >>>>>>>> >>>>>>>> This is how you can instantiate RabbitMQINputOperator and to a >>>>>>>> dag. >>>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>>>>>>> RabbitMQInputOperator.class); >>>>>>>> >>>>>>>> >>>>>>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag >>>>>>>> >>>>>>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag> >>>>>>>> >>>>>>>> Following properties need to be specified in properties.xml >>>>>>>> >>>>>>>> * Properties:<br> >>>>>>>> * <b>tuple_blast</b>: Number of tuples emitted in each >>>>>>>> burst<br> >>>>>>>> * <b>bufferSize</b>: Size of holding buffer<br> >>>>>>>> * <b>host</b>:the address for the consumer to connect to >>>>>>>> rabbitMQ producer<br> >>>>>>>> * <b>exchange</b>:the exchange for the consumer to connect to >>>>>>>> rabbitMQ >>>>>>>> producer<br> >>>>>>>> * <b>exchangeType</b>:the exchangeType for the consumer to >>>>>>>> connect to >>>>>>>> rabbitMQ producer<br> >>>>>>>> * <b>routingKey</b>:the routingKey for the consumer to connect >>>>>>>> to >>>>>>>> rabbitMQ producer<br> >>>>>>>> * <b>queueName</b>:the queueName for the consumer to connect to >>>>>>>> rabbitMQ producer<br> >>>>>>>> * <br> >>>>>>>> >>>>>>>> Reference: >>>>>>>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java >>>>>>>> >>>>>>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Vikram >>>>>>>> >>>>>>>> On Wed, Jun 7, 2017 at 3:19 PM, <a...@x5h.eu> >>>>>>>> <mailto:a...@x5h.eu> wrote: >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I compiled the whole thing but now I don't know exactly how >>>>>>>>> to get it >>>>>>>>> running in Apex. Do I need an application.java like in the >>>>>>>>> tutorial? I do >>>>>>>>> have a simple RabbitMQ queue up and running on the server. >>>>>>>>> How do I consume >>>>>>>>> the messages with Apex and write them to hdfs? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Manfred >>>>>>>>> >>>>>>>>> Following steps were necessary to get the RabbitMq test to >>>>>>>>> compile >>>>>>>>> >>>>>>>>> @TimeoutException >>>>>>>>> import java.util.concurrent.TimeoutException; >>>>>>>>> public void setup() throws IOException,TimeoutException >>>>>>>>> public void teardown() throws IOException,TimeoutException >>>>>>>>> protected void runTest(final int testNum) throws IOException >>>>>>>>> >>>>>>>>> @Build jars >>>>>>>>> cd apex-malhar/contrib/ >>>>>>>>> mvn clean package -DskipTests >>>>>>>>> >>>>>>>>> cd apex-malhar/library/ >>>>>>>>> mvn clean package -DskipTests >>>>>>>>> copy packages to project directory >>>>>>>>> >>>>>>>>> @Link them to the project >>>>>>>>> Add following lines to the pom.xml >>>>>>>>> <dependency> >>>>>>>>> <groupId>contrib</groupId> >>>>>>>>> <artifactId>com.datatorrent.co >>>>>>>>> <http://com.datatorrent.co>ntrib.helper</artifactId> >>>>>>>>> <version>1.0</version> >>>>>>>>> <scope>system</scope> >>>>>>>>> >>>>>>>>> >>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>lib</groupId> >>>>>>>>> <artifactId>com.datatorrent.li >>>>>>>>> <http://com.datatorrent.li>b.helper</artifactId> >>>>>>>>> <version>1.0</version> >>>>>>>>> <scope>system</scope> >>>>>>>>> >>>>>>>>> >>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>contrib</groupId> >>>>>>>>> <artifactId>com.datatorrent.co >>>>>>>>> <http://com.datatorrent.co>ntrib.rabbitmq</artifactId> >>>>>>>>> <version>1.0</version> >>>>>>>>> <scope>system</scope> >>>>>>>>> >>>>>>>>> >>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath> >>>>>>>>> </dependency> >>>>>>>>> <dependency> >>>>>>>>> <groupId>Attribute</groupId> >>>>>>>>> >>>>>>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId> >>>>>>>>> <version>1.0</version> >>>>>>>>> <scope>system</scope> >>>>>>>>> >>>>>>>>> >>>>>>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> >>>>>>>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: >>>>>>>>> >>>>>>>>> Both com.datatorrent.contrib.helper and >>>>>>>>> com.datatorrent.lib.helper are >>>>>>>>> under the test directory under malhar-contrib and >>>>>>>>> malhar-library >>>>>>>>> respectively. You may need to build these jars yourself with >>>>>>>>> test scope to >>>>>>>>> include these packages. >>>>>>>>> >>>>>>>>> On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> >>>>>>>>> <mailto:a...@x5h.eu> wrote: >>>>>>>>>> Hello, (mea culpa for messing up the headline the first time) >>>>>>>>>> >>>>>>>>>> I'm currently trying to get the apex-malhar rabbitmq >>>>>>>>>> running. But I'm at a >>>>>>>>>> complete loss, while the examples are running fine I don't >>>>>>>>>> even get the >>>>>>>>>> RabbitMQInputOperatorTest.java to run. >>>>>>>>>> >>>>>>>>>> First it couldn't find the rabbitmq-client which was >>>>>>>>>> solveable by adding >>>>>>>>>> the dependency: >>>>>>>>>> >>>>>>>>>> <dependency> >>>>>>>>>> <groupId>com.rabbitmq</groupId> >>>>>>>>>> <artifactId>amqp-client</artifactId> >>>>>>>>>> <version>4.1.0</version> >>>>>>>>>> </dependency> >>>>>>>>>> >>>>>>>>>> But now it doesn't find the packages >>>>>>>>>> com.datatorrent.contrib.helper and >>>>>>>>>> com.datatorrent.lib.helper and can't find several symbols. >>>>>>>>>> >>>>>>>>>> Needless to say that I'm a beginner regarding Apex so does >>>>>>>>>> anyone know >>>>>>>>>> what exactly I'm doing wrong here? >>>>>>>>>> >>>>>>>>>> Cheers >>>>>>>>>> >>>>>>>>>> Manfred. >>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >