After various tests I finally got it all working nicely and for future
users I'll post here how.
First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue"
destination="test" routing_key="rktest"
It is important that apex only accepts a non-durable exchange. But this
means you have to recreate it everytime you restart your RabbitMQ service.
The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode
is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter
My example uses the following (I moved the operator values in a
corresponding *.xml file I just listed them here for better understanding):
@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
in.setHost("192.168.33.63");
in.setExchange("apex");
in.setExchangeType("fanout");
in.setQueueName("test");
LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
out.setFilePath("/hdfs/rabbitMQ");
out.setBaseName("rabbitOut");
out.setMaxLength(1024);
out.setRotationWindows(4);
dag.addStream("data", in.outputPort, out.input);
}
}
And the corresponding Output Operator. The only important thing here was
that it extends the byte AbstractFileOutputOperator
public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
private static final String NL = System.lineSeparator();
private static final Charset CS = StandardCharsets.UTF_8;
@NotNull
private String baseName;
@Override
public byte[] getBytesForTuple(byte[] t) {
String result = new String(t, CS) + NL;
return result.getBytes(CS);
}
@Override
protected String getFileName(byte[] tuple) {
return baseName;
}
public String getBaseName() { return baseName; }
public void setBaseName(String v) { baseName = v; }
}
The most pressing issue was that it won't run on the yarn cluster only
in local mode. I still have no idea why it diden't run but my best guess
is that it was a bad idea in the beginning to try the apex app in a
Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with
8GB RAM and without changing a thing in the application it worked
perfectly.
Thanks for all the help!
Am 22.06.2017 um 11:33 schrieb [email protected]:
>
> I drilled the error down to this message:
>
> Mkdirs failed to create
> file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2
>
> I guess i have something buggy in my configuration does any of you
> know how to solve this error? Should I start the application with the
> same user I'm starting yarn?
>
> Cheers
>
> Manfred.
>
>
>
> Am 10.06.2017 um 14:50 schrieb [email protected]:
>>
>> Hello,
>>
>> you were completely right it seems that there are problems with my
>> test scenario regarding the hadoop, yarn installation and the
>> application never starts. I found this entries in the log:
>>
>> 2017-06-10 14:33:02,623 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService:
>> Registering app attempt : appattempt_1495629011552_0011_000001
>> 2017-06-10 14:33:02,623 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl:
>> appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
>> 2017-06-10 14:33:02,624 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
>> not starting application as amIfStarted exceeds amLimit
>> 2017-06-10 14:33:02,624 INFO
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue:
>> Application added - appId: application_1495629011552_0011 user:
>> org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc,
>> leaf-queue: default #user-pending-applications: 1
>> #user-active-applications: 1 #queue-pending-applications: 1
>> #queue-active-applications: 1
>>
>> Therefore the application never leaves the state "undefined". Since
>> the local tests were running fine and the launch of the application
>> didn't raise an error I missed the problem with the hadoop
>> installation. Thanks for the correct hint to look at the hadoop cluster.
>>
>> Cheers
>> Manfred.
>>
>>
>> Am 09.06.2017 um 15:23 schrieb vikram patil:
>>> 1) Are you doing it on your local environment?
>>> 2) If you are doing it locally I would suggest following options
>>> 1) If you dont want to create queue on rabbitmq by yourself .
>>> Set queuename on operator
>>> in.setQueueName("YOUR_QUEUE_NAME" )
>>> Operator will do following steps :
>>> * Create Durable Queue in RabbitMQ
>>> * You have specfied exchange and exchangeType .
>>> So it will create an exchange using this
>>> information and bind created queue with exchange with default
>>> routing key which will be "".
>>> Right now it must be creating auto generated unique named
>>> queue for you.
>>>
>>> 2) You can create your own exchange and durable queue using
>>> rabbitmq admin . You will have to install rabbitmq plugins for that.
>>> You can use it to publish some test data as well.
>>>
>>> Using apex-cli you can check status of your application, if its
>>> failing then you should check logs from userlogs in hadoop logs
>>> directory.
>>>
>>> Thanks & Regards,
>>> Vikram
>>>
>>>
>>>
>>>
>>> On Fri, Jun 9, 2017 at 6:42 PM, <[email protected] <mailto:[email protected]>>
>>> wrote:
>>>
>>> Finally got rid of all errors but now I have the problem that
>>> the apex application does not seem to register at the RabbitMQ
>>> exchange.
>>>
>>> This is my code:
>>>
>>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>> public class RabbitMQApplication implements StreamingApplication
>>> {
>>>
>>> @Override
>>> public void populateDAG(DAG dag, Configuration conf)
>>> {
>>> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
>>> RabbitMQInputOperator());
>>> in.setHost("localhost");
>>> //in.setPort(5672);
>>> in.setExchange("apex");
>>> in.setExchangeType("fanout");
>>> ConsoleOutputOperator console = dag.addOperator("console",
>>> new ConsoleOutputOperator());
>>> dag.addStream("rand_console",in.outputPort, console.input);
>>>
>>> }
>>>
>>> }
>>>
>>> If I launch the application everthing executes without an error
>>> but if i list the bindings on the exchange, there is none.
>>>
>>> Anyone even an idea how i can start to debug this?
>>>
>>> Cheers
>>> Manfred.
>>>
>>>
>>>
>>> Am 08.06.2017 um 18:04 schrieb [email protected] <mailto:[email protected]>:
>>>>
>>>> Okay i found the error, I copied the LineOutputOperator.java
>>>>
>>>> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>>>> from the jmsActiveMQ example and found there
>>>> public class LineOutputOperator extends
>>>> AbstractFileOutputOperator<String>
>>>>
>>>> Instead i took the LineOutputOperator.java
>>>>
>>>> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>>>> the Kafka 0.9 example there the class is correctly defined for
>>>> the RabbitMQInputOperator
>>>>
>>>> So far so good now it compiles without errors.
>>>>
>>>> Cheers
>>>>
>>>> Manfred
>>>>
>>>> Am 08.06.2017 um 17:38 schrieb [email protected] <mailto:[email protected]>:
>>>>>
>>>>> I still don't get it completely: (The rest of the code is in
>>>>> the Email before, this is only the necessary sample)
>>>>>
>>>>> 1. dag.addStream("test", rabbitInput.output, out.input);
>>>>> Results in the following error:
>>>>> [ERROR] symbol: variable output
>>>>> [ERROR] location: variable rabbitInput of type
>>>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>
>>>>> 2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>>>> Results in the following error:
>>>>> [ERROR]
>>>>>
>>>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>>>>> no suitable method found for
>>>>>
>>>>> addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>>>> [ERROR] method
>>>>>
>>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>> T>...) is not applicable
>>>>> [ERROR] (inferred type does not conform to upper
>>>>> bound(s)
>>>>> [ERROR] inferred: byte[]
>>>>> [ERROR] upper bound(s):
>>>>> java.lang.String,java.lang.Object)
>>>>> [ERROR] method
>>>>>
>>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>> T>) is not applicable
>>>>> [ERROR] (inferred type does not conform to upper
>>>>> bound(s)
>>>>> [ERROR] inferred: byte[]
>>>>> [ERROR] upper bound(s):
>>>>> java.lang.String,java.lang.Object)
>>>>> [ERROR] method
>>>>>
>>>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>>>> T>,com.datatorrent.api.Operator.InputPort<? super T>) is
>>>>> not applicable
>>>>> [ERROR] (cannot infer type-variable(s) T
>>>>> [ERROR] (actual and formal argument lists differ
>>>>> in length))
>>>>>
>>>>>
>>>>>
>>>>> It seems that on the one hand the RabbitMQInputOperator.class
>>>>> does not have an output method and on the other hand the
>>>>> addStream method only accepts outputPort combined with
>>>>> inputPort methods or output and input methods of the
>>>>> corresponding classes. Does that mean I only can use a class
>>>>> that implements inputPort method for this example?
>>>>>
>>>>> Cheers
>>>>>
>>>>> Manfred.
>>>>>
>>>>>
>>>>>
>>>>> Am 08.06.2017 um 10:05 schrieb [email protected] <mailto:[email protected]>:
>>>>>>
>>>>>> Sorry the two Snippets below where from different iterations.
>>>>>> The Error I get is the following:
>>>>>>
>>>>>> [ERROR]
>>>>>>
>>>>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>>>>>> cannot find symbol
>>>>>> [ERROR] symbol: variable output
>>>>>> [ERROR] location: variable rabbitInput of type
>>>>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>>>
>>>>>> My Code is as follows:
>>>>>>
>>>>>>
>>>>>> package com.example.rabbitMQ;
>>>>>>
>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>
>>>>>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>>>>>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>>>>>> import com.datatorrent.api.StreamingApplication;
>>>>>> import com.datatorrent.api.DAG;
>>>>>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>>>>>
>>>>>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>>>>> public class RabbitMQApplication implements StreamingApplication
>>>>>> {
>>>>>>
>>>>>> @Override
>>>>>> public void populateDAG(DAG dag, Configuration conf)
>>>>>> {
>>>>>>
>>>>>> RabbitMQInputOperator rabbitInput =
>>>>>> dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>>>>> rabbitInput.setHost("localhost");
>>>>>> rabbitInput.setPort(5672);
>>>>>> rabbitInput.setExchange("");
>>>>>> rabbitInput.setQueueName("hello");
>>>>>> LineOutputOperator out = dag.addOperator("fileOut", new
>>>>>> LineOutputOperator());
>>>>>>
>>>>>> dag.addStream("data", rabbitInput.output, out.input);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> Manfred.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>>>>>> Hi,
>>>>>>> dag.addStream() is actually used to create stream of from one
>>>>>>> Operator output port to other operators input port.
>>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>> RabbitMQInputOperator.class);
>>>>>>> dag.addStream("data", *rabbitInput*.output, out.input);
>>>>>>> Looks like your operator name is incorrect? I see in your
>>>>>>> code snippet above, name of of RabbiMQInputOperator
>>>>>>> is *"Consumer".*
>>>>>>>
>>>>>>> In property name, you need to provide operator name you have
>>>>>>> specified in addOperator(*"NAME OF THE OPERATOR"*,
>>>>>>> RabbitMQInputOperator.class) api call.
>>>>>>>
>>>>>>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is
>>>>>>> correct correct given your operator name is correct ).
>>>>>>>
>>>>>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based
>>>>>>> on your code snippet ).
>>>>>>>
>>>>>>> I think tests which are provided in the Apache Malhar are
>>>>>>> very detailed, they run in local mode as unit tests so we
>>>>>>> have mocked actual rabbitmq by custom message publisher.
>>>>>>>
>>>>>>> For timebeing you set only queuename and hostname as
>>>>>>>
>>>>>>> // set your rabbitmq host .
>>>>>>> consumer.setHost("localhost"); // set your rabbitmq port
>>>>>>> consumer.setPort(5672) // It depends on your rabbitmq
>>>>>>> producer configuration but by default // its default
>>>>>>> exchange with "" ( No Name is provided ).
>>>>>>> consumer.setExchange(""); // set your queue name
>>>>>>> consumer.setQueueName("YOUR_QUEUE_NAME")
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> If its okay, could you please share code from your
>>>>>>> application.java and properties.xml here?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Vikram
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 8, 2017 at 12:32 AM, <[email protected]
>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>
>>>>>>> Thanks very much for the help. The only problem left is
>>>>>>> that I don't quite understand dag.addstream().
>>>>>>>
>>>>>>> I tried this
>>>>>>>
>>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>> RabbitMQInputOperator.class);
>>>>>>> dag.addStream("data", rabbitInput.output, out.input);
>>>>>>>
>>>>>>> but obviously this doesn't work. What I don't get is the
>>>>>>> difference between the ActiveMQ example and the RabbitMQ
>>>>>>> example. I looked over the test examples for RabbitMQ
>>>>>>> but don't seem to understand the logic behind it.
>>>>>>>
>>>>>>> Is this the correct wax to specify properties:
>>>>>>> <property>
>>>>>>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>>>>> <value>500</value>
>>>>>>> </property>
>>>>>>>
>>>>>>> Cheers
>>>>>>> Manfred.
>>>>>>>
>>>>>>>
>>>>>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>>>>> Yes, you would need Application.java which will be way to
>>>>>>>> define a DAG
>>>>>>>> for Apex Application.
>>>>>>>>
>>>>>>>> Please have look at the code from following example to find
>>>>>>>> out how to
>>>>>>>> write JMS ActiveMQ based example:
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>>>>>
>>>>>>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>>>>>
>>>>>>>> This is how you can instantiate RabbitMQINputOperator and to a
>>>>>>>> dag.
>>>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>>>> RabbitMQInputOperator.class);
>>>>>>>>
>>>>>>>>
>>>>>>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>>>>>
>>>>>>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>>>>>
>>>>>>>> Following properties need to be specified in properties.xml
>>>>>>>>
>>>>>>>> * Properties:<br>
>>>>>>>> * <b>tuple_blast</b>: Number of tuples emitted in each
>>>>>>>> burst<br>
>>>>>>>> * <b>bufferSize</b>: Size of holding buffer<br>
>>>>>>>> * <b>host</b>:the address for the consumer to connect to
>>>>>>>> rabbitMQ producer<br>
>>>>>>>> * <b>exchange</b>:the exchange for the consumer to connect to
>>>>>>>> rabbitMQ
>>>>>>>> producer<br>
>>>>>>>> * <b>exchangeType</b>:the exchangeType for the consumer to
>>>>>>>> connect to
>>>>>>>> rabbitMQ producer<br>
>>>>>>>> * <b>routingKey</b>:the routingKey for the consumer to connect
>>>>>>>> to
>>>>>>>> rabbitMQ producer<br>
>>>>>>>> * <b>queueName</b>:the queueName for the consumer to connect to
>>>>>>>> rabbitMQ producer<br>
>>>>>>>> * <br>
>>>>>>>>
>>>>>>>> Reference:
>>>>>>>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>>>>>
>>>>>>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Vikram
>>>>>>>>
>>>>>>>> On Wed, Jun 7, 2017 at 3:19 PM, <[email protected]>
>>>>>>>> <mailto:[email protected]> wrote:
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I compiled the whole thing but now I don't know exactly how
>>>>>>>>> to get it
>>>>>>>>> running in Apex. Do I need an application.java like in the
>>>>>>>>> tutorial? I do
>>>>>>>>> have a simple RabbitMQ queue up and running on the server.
>>>>>>>>> How do I consume
>>>>>>>>> the messages with Apex and write them to hdfs?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Manfred
>>>>>>>>>
>>>>>>>>> Following steps were necessary to get the RabbitMq test to
>>>>>>>>> compile
>>>>>>>>>
>>>>>>>>> @TimeoutException
>>>>>>>>> import java.util.concurrent.TimeoutException;
>>>>>>>>> public void setup() throws IOException,TimeoutException
>>>>>>>>> public void teardown() throws IOException,TimeoutException
>>>>>>>>> protected void runTest(final int testNum) throws IOException
>>>>>>>>>
>>>>>>>>> @Build jars
>>>>>>>>> cd apex-malhar/contrib/
>>>>>>>>> mvn clean package -DskipTests
>>>>>>>>>
>>>>>>>>> cd apex-malhar/library/
>>>>>>>>> mvn clean package -DskipTests
>>>>>>>>> copy packages to project directory
>>>>>>>>>
>>>>>>>>> @Link them to the project
>>>>>>>>> Add following lines to the pom.xml
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>contrib</groupId>
>>>>>>>>> <artifactId>com.datatorrent.co
>>>>>>>>> <http://com.datatorrent.co>ntrib.helper</artifactId>
>>>>>>>>> <version>1.0</version>
>>>>>>>>> <scope>system</scope>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>lib</groupId>
>>>>>>>>> <artifactId>com.datatorrent.li
>>>>>>>>> <http://com.datatorrent.li>b.helper</artifactId>
>>>>>>>>> <version>1.0</version>
>>>>>>>>> <scope>system</scope>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>contrib</groupId>
>>>>>>>>> <artifactId>com.datatorrent.co
>>>>>>>>> <http://com.datatorrent.co>ntrib.rabbitmq</artifactId>
>>>>>>>>> <version>1.0</version>
>>>>>>>>> <scope>system</scope>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>>>>> </dependency>
>>>>>>>>> <dependency>
>>>>>>>>> <groupId>Attribute</groupId>
>>>>>>>>>
>>>>>>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>>>>> <version>1.0</version>
>>>>>>>>> <scope>system</scope>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>>>>>
>>>>>>>>> Both com.datatorrent.contrib.helper and
>>>>>>>>> com.datatorrent.lib.helper are
>>>>>>>>> under the test directory under malhar-contrib and
>>>>>>>>> malhar-library
>>>>>>>>> respectively. You may need to build these jars yourself with
>>>>>>>>> test scope to
>>>>>>>>> include these packages.
>>>>>>>>>
>>>>>>>>> On Wed, May 31, 2017 at 9:39 AM, <[email protected]>
>>>>>>>>> <mailto:[email protected]> wrote:
>>>>>>>>>> Hello, (mea culpa for messing up the headline the first time)
>>>>>>>>>>
>>>>>>>>>> I'm currently trying to get the apex-malhar rabbitmq
>>>>>>>>>> running. But I'm at a
>>>>>>>>>> complete loss, while the examples are running fine I don't
>>>>>>>>>> even get the
>>>>>>>>>> RabbitMQInputOperatorTest.java to run.
>>>>>>>>>>
>>>>>>>>>> First it couldn't find the rabbitmq-client which was
>>>>>>>>>> solveable by adding
>>>>>>>>>> the dependency:
>>>>>>>>>>
>>>>>>>>>> <dependency>
>>>>>>>>>> <groupId>com.rabbitmq</groupId>
>>>>>>>>>> <artifactId>amqp-client</artifactId>
>>>>>>>>>> <version>4.1.0</version>
>>>>>>>>>> </dependency>
>>>>>>>>>>
>>>>>>>>>> But now it doesn't find the packages
>>>>>>>>>> com.datatorrent.contrib.helper and
>>>>>>>>>> com.datatorrent.lib.helper and can't find several symbols.
>>>>>>>>>>
>>>>>>>>>> Needless to say that I'm a beginner regarding Apex so does
>>>>>>>>>> anyone know
>>>>>>>>>> what exactly I'm doing wrong here?
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>> Manfred.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>