Hi.
I am using the first method as u suggested. But it is giving an error that in 
the following line:-
ObjectToNumberConverter converter = dag.addOperator("converter", 
ObjectToNumberConverter.class);

It is asking for importing the package of the class but it is in the same 
package..

Error while launching the application is defined below:-
An error occurred trying to launch the application. Server message: 2016-08-23 
09:32:54.756 INFO net.spy.memcached.auth.AuthThread: Authenticated to 
dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:54.923 INFO 
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could 
bootstrap through carrier publication. 2016-08-23 09:32:54.927 INFO 
com.couchbase.client.CouchbaseConnection: Added {QA 
sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null, 
topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:54.928 
INFO com.couchbase.client.CouchbaseClient: 
CouchbaseConnectionFactory{bucket='beer-sample', 
nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, 
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, 
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, 
configCheck=10, reconnectInt=1100, failureMode=Redistribute, 
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:54.958 INFO 
com.couchbase.client.CouchbaseClient: viewmode set to production mode 
2016-08-23 09:32:55.504 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:55.539 INFO 
net.spy.memcached.auth.AuthThread: Authenticated to 
dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:55.664 INFO 
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could 
bootstrap through carrier publication. 2016-08-23 09:32:55.665 INFO 
com.couchbase.client.CouchbaseConnection: Added {QA 
sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null, 
topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:55.665 
INFO com.couchbase.client.CouchbaseClient: 
CouchbaseConnectionFactory{bucket='beer-sample', 
nodes=[http://dev.nlpcaptcha.in:8091/pools, 
http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, 
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, 
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, 
configCheck=10, reconnectInt=1100, failureMode=Redistribute, 
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:55.666 INFO 
com.couchbase.client.CouchbaseClient: viewmode set to production mode 
2016-08-23 09:32:56.245 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.133 INFO 
net.spy.memcached.auth.AuthThread: Authenticated to 
dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.260 INFO 
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could 
bootstrap through carrier publication. 2016-08-23 09:32:57.264 INFO 
com.couchbase.client.CouchbaseConnection: Added {QA 
sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null, 
topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:57.266 
INFO com.couchbase.client.CouchbaseClient: 
CouchbaseConnectionFactory{bucket='beer-sample', 
nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, 
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, 
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, 
configCheck=10, reconnectInt=1100, failureMode=Redistribute, 
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.285 INFO 
com.couchbase.client.CouchbaseClient: viewmode set to production mode 
2016-08-23 09:32:57.842 INFO net.spy.memcached.auth.AuthThread: Authenticated 
to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.863 INFO 
net.spy.memcached.auth.AuthThread: Authenticated to 
dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.986 INFO 
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could 
bootstrap through carrier publication. 2016-08-23 09:32:57.987 INFO 
com.couchbase.client.CouchbaseConnection: Added {QA 
sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null, 
topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:57.987 
INFO com.couchbase.client.CouchbaseClient: 
CouchbaseConnectionFactory{bucket='beer-sample', 
nodes=[http://dev.nlpcaptcha.in:8091/pools, 
http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, 
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, 
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, 
configCheck=10, reconnectInt=1100, failureMode=Redistribute, 
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.988 INFO 
com.couchbase.client.CouchbaseClient: viewmode set to production mode 
javax.validation.ValidationException: At least one output port must be 
connected: median at 
com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1764) 
at com.datatorrent.stram.StramClient.<init>(StramClient.java:163) at 
com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:596)
 at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2056) 
at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3445) at 
com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:151) at 
com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1900) at 
com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1462)


From: Priyanka Gugale [mailto:priya...@datatorrent.com]
Sent: Tuesday, August 23, 2016 11:54 AM
To: users@apex.apache.org
Subject: Re: connecting two operators

There are two ways you can do this, first as Yogi suggested add a new operator 
to convert object to Number. Below is code example:

CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", 
CouchBasePOJOInputOperator.class);
ObjectToNumberConverter converter = dag.addOperator("converter", 
ObjectToNumberConverter.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);

 dag.addStream("inputFormatter", inputOperator.outputPort, 
converter.in<http://converter.in>).setLocality(Locality.THREAD_LOCAL);
 dag.addStream("med", converter.out, median.data);

//ObjectToNumberConverter Class code

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class ObjectToNumberConverter extends BaseOperator
{
  private static final Logger LOG = 
LoggerFactory.getLogger(ObjectToNumberConverter.class);
  public final transient DefaultOutputPort<Number> out = new 
DefaultOutputPort<>();
  public final transient   DefaultInputPort<Object> in = new 
DefaultInputPort<Object>()
  {
    @Override
    public void process(Object tuple)
    {
      if (tuple instanceof Number) {
        Number number = (Number)tuple;
        out.emit(number);
      } else {
        LOG.info("Error converting object to number. " + tuple.toString());
      }
    }
  };

}


Second way is you can extend/create new MedianOperator which accepts "Object" 
on it's input port and handles type conversion itself.
This is how you can do it:

package com.datatorrent.tutorial.csvparser;

import java.util.ArrayList;
import java.util.Collections;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class MedianOperator extends BaseOperator
{
  private static final Logger LOG = 
LoggerFactory.getLogger(MedianOperator.class);
  private ArrayList<Double> values;

  /**
   * Input data port that takes a number.
   */
  public final transient DefaultInputPort<Object> data = new 
DefaultInputPort<Object>()
  {
    /**
     * Computes sum and count with each tuple
     */
    @Override
    public void process(Object tuple)
    {
      if (tuple instanceof Number) {
        Number numTuple = (Number)tuple;
        values.add(numTuple.doubleValue());
      } else {
        LOG.info("Invalid input format of tuple: " + tuple.toString());
      }

    }
  };

  /**
   * Output port that emits median of incoming data.
   */
  public final transient DefaultOutputPort<Number> median = new 
DefaultOutputPort<Number>();

  @Override
  public void beginWindow(long arg0)
  {
    values = new ArrayList<Double>();
  }

  @Override
  public void endWindow()
  {
    if (values.size() == 0) {
      return;
    }
    if (values.size() == 1) {
      median.emit(values.get(0));
      return;
    }

    // median value
    Collections.sort(values);
    int medianIndex = values.size() / 2;
    if (values.size() % 2 == 0) {
      Double value = values.get(medianIndex - 1);
      value = (value + values.get(medianIndex)) / 2;
      median.emit(value);
    } else {
      median.emit(values.get(medianIndex));
    }
  }

}





On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra 
<devendra.vyavah...@gmail.com<mailto:devendra.vyavah...@gmail.com>> wrote:
Define  ObjectToNumberConverter operator which accepts Object on the input port 
and emits Number on its output port.

Use this operator in your DAG.

~ Yogi

On 22 August 2016 at 17:42, Hitesh Goyal 
<hitesh.go...@nlpcaptcha.com<mailto:hitesh.go...@nlpcaptcha.com>> wrote:
Hi team,
          Refer to following 3 lines of code.
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", 
CouchBasePOJOInputOperator.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);
dag.addStream("med", inputOperator.outputPort.getClass(), median.data);

    It is giving error in the last line that the method is not applicable as 
these arguments.
    How should I convert Object to Number so that I can input the data to 
Median Operator Class


Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307



Reply via email to