Okay, as I understand, as of now your further processing piece is missing. You can use ConsoleOutputOperator for time being. Once you have your processing operators ready you can remove the ConsoleOutputOperator and connect output of Median operator to your processing operators.
-Priyanka On Tue, Aug 23, 2016 at 2:58 PM, Hitesh Goyal <hitesh.go...@nlpcaptcha.com> wrote: > I want to access data from Couchbase using CouchBasePOJOInputOperator and > pass it to median operator for calculating the median of that data. And > then I want that median value as a variable or so for further processing. > > *From:* Priyanka Gugale [mailto:priya...@datatorrent.com] > *Sent:* Tuesday, August 23, 2016 2:49 PM > > *To:* users@apex.apache.org > *Subject:* Re: connecting two operators > > > > Hi Hitesh, > > > > "Median" is *not* an output operator, we should connect at least one of > the output port of non i/o operators to next operator. In this example you > can either use ConsoleOutputOperator to log median output to Console. > > > > Can you share what you want to achieve using this app? The output of > median will be emitted on the "median" port, if you don't connect that > port, the calculated value won't be used anywhere. > > > > -Priyanka > > > > On Tue, Aug 23, 2016 at 2:28 PM, Hitesh Goyal <hitesh.go...@nlpcaptcha.com> > wrote: > > Hi. > > I am using the first method as u suggested. But it is giving an error that > in the following line:- > > ObjectToNumberConverter converter = dag.addOperator("converter", > ObjectToNumberConverter.class); > > > > It is asking for importing the package of the class but it is in the same > package.. > > > > Error while launching the application is defined below:- > > An error occurred trying to launch the application. Server message: > 2016-08-23 09:32:54.756 INFO net.spy.memcached.auth.AuthThread: > Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 > 09:32:54.923 INFO > com.couchbase.client.vbucket.provider.BucketConfigurationProvider: > Could bootstrap through carrier publication. 2016-08-23 09:32:54.927 INFO > com.couchbase.client.CouchbaseConnection: Added {QA sa= > dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, > topRop=null, topWop=null, toWrite=0, interested=0} to connect queue > 2016-08-23 09:32:54.928 INFO com.couchbase.client.CouchbaseClient: > CouchbaseConnectionFactory{bucket='beer-sample', nodes=[ > http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, > opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, > obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, > configCheck=10, reconnectInt=1100, failureMode=Redistribute, > hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:54.958 INFO > com.couchbase.client.CouchbaseClient: viewmode set to production mode > 2016-08-23 09:32:55.504 INFO net.spy.memcached.auth.AuthThread: > Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 > 09:32:55.539 INFO net.spy.memcached.auth.AuthThread: Authenticated to > dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:55.664 INFO > com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could > bootstrap through carrier publication. 2016-08-23 09:32:55.665 INFO > com.couchbase.client.CouchbaseConnection: Added {QA sa= > dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, > topRop=null, topWop=null, toWrite=0, interested=0} to connect queue > 2016-08-23 09:32:55.665 INFO com.couchbase.client.CouchbaseClient: > CouchbaseConnectionFactory{bucket='beer-sample', nodes=[ > http://dev.nlpcaptcha.in:8091/pools, http://dev.nlpcaptcha.in:8091/pools], > order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, > obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, > viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, > failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} > 2016-08-23 09:32:55.666 INFO com.couchbase.client.CouchbaseClient: > viewmode set to production mode 2016-08-23 09:32:56.245 INFO > net.spy.memcached.auth.AuthThread: Authenticated to > dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.133 INFO > net.spy.memcached.auth.AuthThread: Authenticated to > dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.260 INFO > com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could > bootstrap through carrier publication. 2016-08-23 09:32:57.264 INFO > com.couchbase.client.CouchbaseConnection: Added {QA sa= > dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, > topRop=null, topWop=null, toWrite=0, interested=0} to connect queue > 2016-08-23 09:32:57.266 INFO com.couchbase.client.CouchbaseClient: > CouchbaseConnectionFactory{bucket='beer-sample', nodes=[ > http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, > opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, > obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, > configCheck=10, reconnectInt=1100, failureMode=Redistribute, > hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.285 INFO > com.couchbase.client.CouchbaseClient: viewmode set to production mode > 2016-08-23 09:32:57.842 INFO net.spy.memcached.auth.AuthThread: > Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 > 09:32:57.863 INFO net.spy.memcached.auth.AuthThread: Authenticated to > dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.986 INFO > com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could > bootstrap through carrier publication. 2016-08-23 09:32:57.987 INFO > com.couchbase.client.CouchbaseConnection: Added {QA sa= > dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, > topRop=null, topWop=null, toWrite=0, interested=0} to connect queue > 2016-08-23 09:32:57.987 INFO com.couchbase.client.CouchbaseClient: > CouchbaseConnectionFactory{bucket='beer-sample', nodes=[ > http://dev.nlpcaptcha.in:8091/pools, http://dev.nlpcaptcha.in:8091/pools], > order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, > obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, > viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, > failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} > 2016-08-23 09:32:57.988 INFO com.couchbase.client.CouchbaseClient: > viewmode set to production mode javax.validation.ValidationException: At > least one output port must be connected: median at > com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1764) > at com.datatorrent.stram.StramClient.<init>(StramClient.java:163) at > com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:596) > at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2056) > at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3445) > at com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:151) at > com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1900) > at com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1462) > > > > > > *From:* Priyanka Gugale [mailto:priya...@datatorrent.com] > *Sent:* Tuesday, August 23, 2016 11:54 AM > *To:* users@apex.apache.org > *Subject:* Re: connecting two operators > > > > There are two ways you can do this, first as Yogi suggested add a new > operator to convert object to Number. Below is code example: > > > > CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", > CouchBasePOJOInputOperator.class); > > ObjectToNumberConverter converter = dag.addOperator("converter", > ObjectToNumberConverter.class); > > MedianOperator median = dag.addOperator("median", MedianOperator.class); > > > > dag.addStream("inputFormatter", inputOperator.outputPort, converter.in > ).setLocality(Locality.THREAD_LOCAL); > > dag.addStream("med", converter.out, median.data); > > > > //ObjectToNumberConverter Class code > > > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > > > > import com.datatorrent.api.DefaultInputPort; > > import com.datatorrent.api.DefaultOutputPort; > > import com.datatorrent.common.util.BaseOperator; > > > > public class ObjectToNumberConverter extends BaseOperator > > { > > private static final Logger LOG = LoggerFactory.getLogger( > ObjectToNumberConverter.class); > > public final transient DefaultOutputPort<Number> out = new > DefaultOutputPort<>(); > > public final transient DefaultInputPort<Object> in = new > DefaultInputPort<Object>() > > { > > @Override > > public void process(Object tuple) > > { > > if (tuple instanceof Number) { > > Number number = (Number)tuple; > > out.emit(number); > > } else { > > LOG.info("Error converting object to number. " + tuple.toString()); > > } > > } > > }; > > > > } > > > > > > Second way is you can extend/create new MedianOperator which accepts > "Object" on it's input port and handles type conversion itself. > > This is how you can do it: > > > > package com.datatorrent.tutorial.csvparser; > > > > import java.util.ArrayList; > > import java.util.Collections; > > > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > > > > import com.datatorrent.api.DefaultInputPort; > > import com.datatorrent.api.DefaultOutputPort; > > import com.datatorrent.common.util.BaseOperator; > > > > public class MedianOperator extends BaseOperator > > { > > private static final Logger LOG = LoggerFactory.getLogger( > MedianOperator.class); > > private ArrayList<Double> values; > > > > /** > > * Input data port that takes a number. > > */ > > public final transient DefaultInputPort<Object> data = new > DefaultInputPort<Object>() > > { > > /** > > * Computes sum and count with each tuple > > */ > > @Override > > public void process(Object tuple) > > { > > if (tuple instanceof Number) { > > Number numTuple = (Number)tuple; > > values.add(numTuple.doubleValue()); > > } else { > > LOG.info("Invalid input format of tuple: " + tuple.toString()); > > } > > > > } > > }; > > > > /** > > * Output port that emits median of incoming data. > > */ > > public final transient DefaultOutputPort<Number> median = new > DefaultOutputPort<Number>(); > > > > @Override > > public void beginWindow(long arg0) > > { > > values = new ArrayList<Double>(); > > } > > > > @Override > > public void endWindow() > > { > > if (values.size() == 0) { > > return; > > } > > if (values.size() == 1) { > > median.emit(values.get(0)); > > return; > > } > > > > // median value > > Collections.sort(values); > > int medianIndex = values.size() / 2; > > if (values.size() % 2 == 0) { > > Double value = values.get(medianIndex - 1); > > value = (value + values.get(medianIndex)) / 2; > > median.emit(value); > > } else { > > median.emit(values.get(medianIndex)); > > } > > } > > > > } > > > > > > > > > > > > On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra < > devendra.vyavah...@gmail.com> wrote: > > Define ObjectToNumberConverter operator which accepts Object on the input > port and emits Number on its output port. > > > > Use this operator in your DAG. > > > ~ Yogi > > > > On 22 August 2016 at 17:42, Hitesh Goyal <hitesh.go...@nlpcaptcha.com> > wrote: > > Hi team, > > Refer to following 3 lines of code. > > CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", > CouchBasePOJOInputOperator.*class*); > > MedianOperator median = dag.addOperator("median", MedianOperator.*class*); > > dag.*addStream*("med", inputOperator.outputPort.getClass(), median.data); > > > > It is giving error in the last line that the method is not applicable > as these arguments. > > How should I convert Object to Number so that I can input the data to > Median Operator Class > > > > > > Regards, > > *Hitesh Goyal* > > Simpli5d Technologies > > Cont No.: 9599803307 > > > > > > > > >