Okay i found the error, I copied the LineOutputOperator.java
<https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java 
<https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
the Kafka 0.9 example there the class is correctly defined for the
RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb a...@x5h.eu:
>
> I still don't get it completely: (The rest of the code is in the Email
> before, this is only the necessary sample)
>
>  1. dag.addStream("test", rabbitInput.output, out.input);
>     Results in the following error:
>     [ERROR]   symbol:   variable output
>     [ERROR]   location: variable rabbitInput of type
>     com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>
>  2. dag.addStream("test", rabbitInput.outputPort, out.input);
>     Results in the following error:
>     [ERROR]
>     
> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>     no suitable method found for
>     
> addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>     [ERROR]     method
>     
> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>     extends T>,com.datatorrent.api.Operator.InputPort<? super T>...)
>     is not applicable
>     [ERROR]       (inferred type does not conform to upper bound(s)
>     [ERROR]         inferred: byte[]
>     [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
>     [ERROR]     method
>     
> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>     extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is
>     not applicable
>     [ERROR]       (inferred type does not conform to upper bound(s)
>     [ERROR]         inferred: byte[]
>     [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
>     [ERROR]     method
>     
> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>     extends T>,com.datatorrent.api.Operator.InputPort<? super
>     T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>     applicable
>     [ERROR]       (cannot infer type-variable(s) T
>     [ERROR]         (actual and formal argument lists differ in length))
>
>
>
> It seems that on the one hand the RabbitMQInputOperator.class does not
> have an output method and on the other hand the addStream method only
> accepts outputPort combined with inputPort methods or output and input
> methods of the corresponding classes. Does that mean I only can use a
> class that implements inputPort method for this example?
>
> Cheers
>
> Manfred.
>
>
>
> Am 08.06.2017 um 10:05 schrieb a...@x5h.eu:
>>
>> Sorry the two Snippets below where from different iterations. The
>> Error I get is the following:
>>
>> [ERROR]
>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>> cannot find symbol
>> [ERROR]   symbol:   variable output
>> [ERROR]   location: variable rabbitInput of type
>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>> My Code is as follows:
>>
>>
>> package com.example.rabbitMQ;
>>
>> import org.apache.hadoop.conf.Configuration;
>>
>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>> import com.datatorrent.api.StreamingApplication;
>> import com.datatorrent.api.DAG;
>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>
>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>> public class RabbitMQApplication implements StreamingApplication
>> {
>>
>>   @Override
>>   public void populateDAG(DAG dag, Configuration conf)
>>   {
>>
>>     RabbitMQInputOperator rabbitInput =
>> dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>     rabbitInput.setHost("localhost");
>>     rabbitInput.setPort(5672);
>>     rabbitInput.setExchange("");
>>     rabbitInput.setQueueName("hello");
>>     LineOutputOperator out = dag.addOperator("fileOut", new
>> LineOutputOperator());
>>
>>     dag.addStream("data", rabbitInput.output, out.input);
>> }
>> }
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>> Hi,
>>> dag.addStream() is actually used to create stream of from one Operator 
>>> output port to other operators input port.
>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>> RabbitMQInputOperator.class);
>>> dag.addStream("data", *rabbitInput*.output, out.input); 
>>> Looks like your operator name is incorrect? I see in your code
>>> snippet above, name of of RabbiMQInputOperator is *"Consumer".*
>>>
>>> In property name, you need to provide operator name you have
>>> specified in addOperator(*"NAME OF THE OPERATOR"*,
>>> RabbitMQInputOperator.class) api call.
>>>  
>>>      dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct
>>> correct given your operator name is correct ). 
>>>
>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your
>>> code snippet ).
>>>
>>> I think tests which are provided in the Apache Malhar are very
>>> detailed, they run in local mode as unit tests so we have mocked
>>> actual rabbitmq by custom message publisher. 
>>>
>>> For timebeing you set only queuename and hostname as
>>>
>>>    //  set your rabbitmq host . 
>>> consumer.setHost("localhost"); // set your rabbitmq port
>>> consumer.setPort(5672) // It depends on your rabbitmq producer
>>> configuration but by default // its default exchange with "" ( No
>>> Name is provided ). consumer.setExchange(""); // set your queue name
>>> consumer.setQueueName("YOUR_QUEUE_NAME")
>>>
>>>     
>>>
>>>
>>> If its okay, could you please share code from your application.java
>>> and properties.xml here?
>>>
>>> Thanks,
>>> Vikram
>>>
>>>
>>> On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu <mailto:a...@x5h.eu>>
>>> wrote:
>>>
>>>     Thanks very much for the help. The only problem left is that I
>>>     don't quite understand dag.addstream().
>>>
>>>     I tried this
>>>
>>>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>     RabbitMQInputOperator.class);
>>>     dag.addStream("data", rabbitInput.output, out.input); 
>>>
>>>     but obviously this doesn't work. What I don't get is the
>>>     difference between the ActiveMQ example and the RabbitMQ
>>>     example. I looked over the test examples for RabbitMQ but don't
>>>     seem to understand the logic behind it.
>>>
>>>     Is this the correct wax to specify properties:
>>>       <property>
>>>         <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>         <value>500</value>
>>>       </property>
>>>
>>>     Cheers
>>>     Manfred.
>>>
>>>
>>>     Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>     Yes, you would need Application.java which will be way to define a DAG
>>>>     for Apex Application.
>>>>
>>>>     Please have look at the code from following example to find out how to
>>>>     write JMS ActiveMQ based example:
>>>>
>>>>     
>>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>     
>>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>
>>>>     This is how you can instantiate RabbitMQINputOperator and to a dag.
>>>>     RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>     RabbitMQInputOperator.class);
>>>>
>>>>     
>>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>     
>>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>
>>>>     Following properties need to be specified in properties.xml
>>>>
>>>>     * Properties:<br>
>>>>     * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>>>     * <b>bufferSize</b>: Size of holding buffer<br>
>>>>     * <b>host</b>:the address for the consumer to connect to rabbitMQ 
>>>> producer<br>
>>>>     * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>>>>     producer<br>
>>>>     * <b>exchangeType</b>:the exchangeType for the consumer to connect to
>>>>     rabbitMQ producer<br>
>>>>     * <b>routingKey</b>:the routingKey for the consumer to connect to
>>>>     rabbitMQ producer<br>
>>>>     * <b>queueName</b>:the queueName for the consumer to connect to
>>>>     rabbitMQ producer<br>
>>>>     * <br>
>>>>
>>>>     Reference: 
>>>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>     
>>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>
>>>>     Thanks,
>>>>     Vikram
>>>>
>>>>     On Wed, Jun 7, 2017 at 3:19 PM,  <a...@x5h.eu> <mailto:a...@x5h.eu> 
>>>> wrote:
>>>>>     Hello,
>>>>>
>>>>>     I compiled the whole thing but now I don't know exactly how to get it
>>>>>     running in Apex. Do I need an application.java like in the tutorial? 
>>>>> I do
>>>>>     have a simple RabbitMQ queue up and running on the server. How do I 
>>>>> consume
>>>>>     the messages with Apex and write them to hdfs?
>>>>>
>>>>>     Cheers,
>>>>>
>>>>>     Manfred
>>>>>
>>>>>     Following steps were necessary to get the RabbitMq test to compile
>>>>>
>>>>>     @TimeoutException
>>>>>     import java.util.concurrent.TimeoutException;
>>>>>     public void setup() throws IOException,TimeoutException
>>>>>     public void teardown() throws IOException,TimeoutException
>>>>>     protected void runTest(final int testNum) throws IOException
>>>>>
>>>>>     @Build jars
>>>>>     cd apex-malhar/contrib/
>>>>>     mvn clean package -DskipTests
>>>>>
>>>>>     cd apex-malhar/library/
>>>>>     mvn clean package -DskipTests
>>>>>     copy packages to project directory
>>>>>
>>>>>     @Link them to the project
>>>>>     Add following lines to the pom.xml
>>>>>     <dependency>
>>>>>         <groupId>contrib</groupId>
>>>>>         <artifactId>com.datatorrent.contrib.helper</artifactId>
>>>>>         <version>1.0</version>
>>>>>         <scope>system</scope>
>>>>>
>>>>>     
>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>         <groupId>lib</groupId>
>>>>>         <artifactId>com.datatorrent.lib.helper</artifactId>
>>>>>         <version>1.0</version>
>>>>>         <scope>system</scope>
>>>>>
>>>>>     
>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>         <groupId>contrib</groupId>
>>>>>         <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>>>>>         <version>1.0</version>
>>>>>         <scope>system</scope>
>>>>>
>>>>>     
>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>         <groupId>Attribute</groupId>
>>>>>         
>>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>         <version>1.0</version>
>>>>>         <scope>system</scope>
>>>>>
>>>>>     
>>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>     </dependency>
>>>>>
>>>>>
>>>>>     Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>
>>>>>     Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper 
>>>>> are
>>>>>     under the test directory under malhar-contrib and malhar-library
>>>>>     respectively. You may need to build these jars yourself with test 
>>>>> scope to
>>>>>     include these packages.
>>>>>
>>>>>     On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> <mailto:a...@x5h.eu> 
>>>>> wrote:
>>>>>>     Hello, (mea culpa for messing up the headline the first time)
>>>>>>
>>>>>>     I'm currently trying to get the apex-malhar rabbitmq running. But 
>>>>>> I'm at a
>>>>>>     complete loss, while the examples are running fine I don't even get 
>>>>>> the
>>>>>>     RabbitMQInputOperatorTest.java to run.
>>>>>>
>>>>>>     First it couldn't find the rabbitmq-client which was solveable by 
>>>>>> adding
>>>>>>     the dependency:
>>>>>>
>>>>>>     <dependency>
>>>>>>         <groupId>com.rabbitmq</groupId>
>>>>>>         <artifactId>amqp-client</artifactId>
>>>>>>         <version>4.1.0</version>
>>>>>>       </dependency>
>>>>>>
>>>>>>     But now it doesn't find the packages com.datatorrent.contrib.helper 
>>>>>> and
>>>>>>     com.datatorrent.lib.helper and can't find several symbols.
>>>>>>
>>>>>>     Needless to say that I'm a beginner regarding Apex so does anyone 
>>>>>> know
>>>>>>     what exactly I'm doing wrong here?
>>>>>>
>>>>>>     Cheers
>>>>>>
>>>>>>     Manfred.
>>>>>>
>>>>>>
>>>
>>>
>>
>

Reply via email to