Hi,
I have used the operators in the way u suggested and the application got 
successfully launched.
              CouchBasePOJOInputOperator inputOperator = 
dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class);
              ConsoleOutputOperator cons = dag.addOperator("console", new 
ConsoleOutputOperator());
              ObjectToNumberConverter converter=dag.addOperator("converter", 
ObjectToNumberConverter.class);
              MedianOperator median = dag.addOperator("median", 
MedianOperator.class);
              dag.addStream("inputFormatter", inputOperator.outputPort, 
converter.in);
              dag.addStream("med", converter.out, median.data);
              dag.addStream("out", median.median, cons.input);


inputOperator is emitting tuples. These tuples gets processed in converter 
operator but ObjectToNumberConverter is not emitting anything at its 
outputport(i.e. at converter.out) . What can be the reasons as there are no 
error showing in the code.


The CouchBasePOJOInputOperator in emitting the following output.


{
  "string": "{\"abv\":\"0.0\"}"
}


From: Priyanka Gugale [mailto:priya...@datatorrent.com]
Sent: Tuesday, August 23, 2016 3:01 PM
To: users@apex.apache.org
Subject: Re: connecting two operators

Okay, as I understand, as of now your further processing piece is missing. You 
can use ConsoleOutputOperator for time being. Once you have your processing 
operators ready you can remove the ConsoleOutputOperator and connect output of 
Median operator to your processing operators.

-Priyanka

On Tue, Aug 23, 2016 at 2:58 PM, Hitesh Goyal 
<hitesh.go...@nlpcaptcha.com<mailto:hitesh.go...@nlpcaptcha.com>> wrote:
I want to access data from Couchbase using CouchBasePOJOInputOperator and pass 
it to median operator for calculating the median of that data. And then I want 
that median value as a variable or so for further processing.
From: Priyanka Gugale 
[mailto:priya...@datatorrent.com<mailto:priya...@datatorrent.com>]
Sent: Tuesday, August 23, 2016 2:49 PM

To: users@apex.apache.org<mailto:users@apex.apache.org>
Subject: Re: connecting two operators

Hi Hitesh,

"Median" is not an output operator, we should connect at least one of the 
output port of non i/o operators to next operator. In this example you can 
either use ConsoleOutputOperator to log median output to Console.

Can you share what you want to achieve using this app? The output of median 
will be emitted on the "median" port, if you don't connect that port, the 
calculated value won't be used anywhere.

-Priyanka

On Tue, Aug 23, 2016 at 2:28 PM, Hitesh Goyal 
<hitesh.go...@nlpcaptcha.com<mailto:hitesh.go...@nlpcaptcha.com>> wrote:
Hi.
I am using the first method as u suggested. But it is giving an error that in 
the following line:-
ObjectToNumberConverter converter = dag.addOperator("converter", 
ObjectToNumberConverter.class);

It is asking for importing the package of the class but it is in the same 
package..

Error while launching the application is defined below:-
An error occurred trying to launch the application. Server message: 2016-08-23 
09:32:54.756 INFO net.spy.memcached.auth.AuthThread: Authenticated to 
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
 2016-08-23 09:32:54.923 INFO 
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could 
bootstrap through carrier publication. 2016-08-23 09:32:54.927 INFO 
com.couchbase.client.CouchbaseConnection: Added {QA 
sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>,
 #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to 
connect queue 2016-08-23 09:32:54.928 INFO 
com.couchbase.client.CouchbaseClient: 
CouchbaseConnectionFactory{bucket='beer-sample', 
nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, 
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, 
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, 
configCheck=10, reconnectInt=1100, failureMode=Redistribute, 
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:54.958 INFO 
com.couchbase.client.CouchbaseClient: viewmode set to production mode 
2016-08-23 09:32:55.504 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to 
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
 2016-08-23 09:32:55.539 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to 
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
 2016-08-23 09:32:55.664 INFO 
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could 
bootstrap through carrier publication. 2016-08-23 09:32:55.665 INFO 
com.couchbase.client.CouchbaseConnection: Added {QA 
sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>,
 #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to 
connect queue 2016-08-23 09:32:55.665 INFO 
com.couchbase.client.CouchbaseClient: 
CouchbaseConnectionFactory{bucket='beer-sample', 
nodes=[http://dev.nlpcaptcha.in:8091/pools, 
http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, 
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, 
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, 
configCheck=10, reconnectInt=1100, failureMode=Redistribute, 
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:55.666 INFO 
com.couchbase.client.CouchbaseClient: viewmode set to production mode 
2016-08-23 09:32:56.245 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to 
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
 2016-08-23 09:32:57.133 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to 
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
 2016-08-23 09:32:57.260 INFO 
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could 
bootstrap through carrier publication. 2016-08-23 09:32:57.264 INFO 
com.couchbase.client.CouchbaseConnection: Added {QA 
sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>,
 #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to 
connect queue 2016-08-23 09:32:57.266 INFO 
com.couchbase.client.CouchbaseClient: 
CouchbaseConnectionFactory{bucket='beer-sample', 
nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, 
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, 
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, 
configCheck=10, reconnectInt=1100, failureMode=Redistribute, 
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.285 INFO 
com.couchbase.client.CouchbaseClient: viewmode set to production mode 
2016-08-23 09:32:57.842 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to 
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
 2016-08-23 09:32:57.863 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to 
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
 2016-08-23 09:32:57.986 INFO 
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could 
bootstrap through carrier publication. 2016-08-23 09:32:57.987 INFO 
com.couchbase.client.CouchbaseConnection: Added {QA 
sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>,
 #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to 
connect queue 2016-08-23 09:32:57.987 INFO 
com.couchbase.client.CouchbaseClient: 
CouchbaseConnectionFactory{bucket='beer-sample', 
nodes=[http://dev.nlpcaptcha.in:8091/pools, 
http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, 
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, 
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, 
configCheck=10, reconnectInt=1100, failureMode=Redistribute, 
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.988 INFO 
com.couchbase.client.CouchbaseClient: viewmode set to production mode 
javax.validation.ValidationException: At least one output port must be 
connected: median at 
com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1764) 
at com.datatorrent.stram.StramClient.<init>(StramClient.java:163) at 
com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:596)
 at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2056) 
at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3445) at 
com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:151) at 
com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1900) at 
com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1462)


From: Priyanka Gugale 
[mailto:priya...@datatorrent.com<mailto:priya...@datatorrent.com>]
Sent: Tuesday, August 23, 2016 11:54 AM
To: users@apex.apache.org<mailto:users@apex.apache.org>
Subject: Re: connecting two operators

There are two ways you can do this, first as Yogi suggested add a new operator 
to convert object to Number. Below is code example:

CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", 
CouchBasePOJOInputOperator.class);
ObjectToNumberConverter converter = dag.addOperator("converter", 
ObjectToNumberConverter.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);

 dag.addStream("inputFormatter", inputOperator.outputPort, 
converter.in<http://converter.in>).setLocality(Locality.THREAD_LOCAL);
 dag.addStream("med", converter.out, median.data);

//ObjectToNumberConverter Class code

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class ObjectToNumberConverter extends BaseOperator
{
  private static final Logger LOG = 
LoggerFactory.getLogger(ObjectToNumberConverter.class);
  public final transient DefaultOutputPort<Number> out = new 
DefaultOutputPort<>();
  public final transient   DefaultInputPort<Object> in = new 
DefaultInputPort<Object>()
  {
    @Override
    public void process(Object tuple)
    {
      if (tuple instanceof Number) {
        Number number = (Number)tuple;
        out.emit(number);
      } else {
        LOG.info("Error converting object to number. " + tuple.toString());
      }
    }
  };

}


Second way is you can extend/create new MedianOperator which accepts "Object" 
on it's input port and handles type conversion itself.
This is how you can do it:

package com.datatorrent.tutorial.csvparser;

import java.util.ArrayList;
import java.util.Collections;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class MedianOperator extends BaseOperator
{
  private static final Logger LOG = 
LoggerFactory.getLogger(MedianOperator.class);
  private ArrayList<Double> values;

  /**
   * Input data port that takes a number.
   */
  public final transient DefaultInputPort<Object> data = new 
DefaultInputPort<Object>()
  {
    /**
     * Computes sum and count with each tuple
     */
    @Override
    public void process(Object tuple)
    {
      if (tuple instanceof Number) {
        Number numTuple = (Number)tuple;
        values.add(numTuple.doubleValue());
      } else {
        LOG.info("Invalid input format of tuple: " + tuple.toString());
      }

    }
  };

  /**
   * Output port that emits median of incoming data.
   */
  public final transient DefaultOutputPort<Number> median = new 
DefaultOutputPort<Number>();

  @Override
  public void beginWindow(long arg0)
  {
    values = new ArrayList<Double>();
  }

  @Override
  public void endWindow()
  {
    if (values.size() == 0) {
      return;
    }
    if (values.size() == 1) {
      median.emit(values.get(0));
      return;
    }

    // median value
    Collections.sort(values);
    int medianIndex = values.size() / 2;
    if (values.size() % 2 == 0) {
      Double value = values.get(medianIndex - 1);
      value = (value + values.get(medianIndex)) / 2;
      median.emit(value);
    } else {
      median.emit(values.get(medianIndex));
    }
  }

}





On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra 
<devendra.vyavah...@gmail.com<mailto:devendra.vyavah...@gmail.com>> wrote:
Define  ObjectToNumberConverter operator which accepts Object on the input port 
and emits Number on its output port.

Use this operator in your DAG.

~ Yogi

On 22 August 2016 at 17:42, Hitesh Goyal 
<hitesh.go...@nlpcaptcha.com<mailto:hitesh.go...@nlpcaptcha.com>> wrote:
Hi team,
          Refer to following 3 lines of code.
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", 
CouchBasePOJOInputOperator.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);
dag.addStream("med", inputOperator.outputPort.getClass(), median.data);

    It is giving error in the last line that the method is not applicable as 
these arguments.
    How should I convert Object to Number so that I can input the data to 
Median Operator Class


Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307





Reply via email to