Hi, I have used the operators in the way u suggested and the application got successfully launched. CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class); ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator()); ObjectToNumberConverter converter=dag.addOperator("converter", ObjectToNumberConverter.class); MedianOperator median = dag.addOperator("median", MedianOperator.class); dag.addStream("inputFormatter", inputOperator.outputPort, converter.in); dag.addStream("med", converter.out, median.data); dag.addStream("out", median.median, cons.input);
inputOperator is emitting tuples. These tuples gets processed in converter operator but ObjectToNumberConverter is not emitting anything at its outputport(i.e. at converter.out) . What can be the reasons as there are no error showing in the code. The CouchBasePOJOInputOperator in emitting the following output. { "string": "{\"abv\":\"0.0\"}" } From: Priyanka Gugale [mailto:priya...@datatorrent.com] Sent: Tuesday, August 23, 2016 3:01 PM To: users@apex.apache.org Subject: Re: connecting two operators Okay, as I understand, as of now your further processing piece is missing. You can use ConsoleOutputOperator for time being. Once you have your processing operators ready you can remove the ConsoleOutputOperator and connect output of Median operator to your processing operators. -Priyanka On Tue, Aug 23, 2016 at 2:58 PM, Hitesh Goyal <hitesh.go...@nlpcaptcha.com<mailto:hitesh.go...@nlpcaptcha.com>> wrote: I want to access data from Couchbase using CouchBasePOJOInputOperator and pass it to median operator for calculating the median of that data. And then I want that median value as a variable or so for further processing. From: Priyanka Gugale [mailto:priya...@datatorrent.com<mailto:priya...@datatorrent.com>] Sent: Tuesday, August 23, 2016 2:49 PM To: users@apex.apache.org<mailto:users@apex.apache.org> Subject: Re: connecting two operators Hi Hitesh, "Median" is not an output operator, we should connect at least one of the output port of non i/o operators to next operator. In this example you can either use ConsoleOutputOperator to log median output to Console. Can you share what you want to achieve using this app? The output of median will be emitted on the "median" port, if you don't connect that port, the calculated value won't be used anywhere. -Priyanka On Tue, Aug 23, 2016 at 2:28 PM, Hitesh Goyal <hitesh.go...@nlpcaptcha.com<mailto:hitesh.go...@nlpcaptcha.com>> wrote: Hi. I am using the first method as u suggested. But it is giving an error that in the following line:- ObjectToNumberConverter converter = dag.addOperator("converter", ObjectToNumberConverter.class); It is asking for importing the package of the class but it is in the same package.. Error while launching the application is defined below:- An error occurred trying to launch the application. Server message: 2016-08-23 09:32:54.756 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210> 2016-08-23 09:32:54.923 INFO com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could bootstrap through carrier publication. 2016-08-23 09:32:54.927 INFO com.couchbase.client.CouchbaseConnection: Added {QA sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>, #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:54.928 INFO com.couchbase.client.CouchbaseClient: CouchbaseConnectionFactory{bucket='beer-sample', nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:54.958 INFO com.couchbase.client.CouchbaseClient: viewmode set to production mode 2016-08-23 09:32:55.504 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210> 2016-08-23 09:32:55.539 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210> 2016-08-23 09:32:55.664 INFO com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could bootstrap through carrier publication. 2016-08-23 09:32:55.665 INFO com.couchbase.client.CouchbaseConnection: Added {QA sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>, #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:55.665 INFO com.couchbase.client.CouchbaseClient: CouchbaseConnectionFactory{bucket='beer-sample', nodes=[http://dev.nlpcaptcha.in:8091/pools, http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:55.666 INFO com.couchbase.client.CouchbaseClient: viewmode set to production mode 2016-08-23 09:32:56.245 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210> 2016-08-23 09:32:57.133 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210> 2016-08-23 09:32:57.260 INFO com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could bootstrap through carrier publication. 2016-08-23 09:32:57.264 INFO com.couchbase.client.CouchbaseConnection: Added {QA sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>, #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:57.266 INFO com.couchbase.client.CouchbaseClient: CouchbaseConnectionFactory{bucket='beer-sample', nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.285 INFO com.couchbase.client.CouchbaseClient: viewmode set to production mode 2016-08-23 09:32:57.842 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210> 2016-08-23 09:32:57.863 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210> 2016-08-23 09:32:57.986 INFO com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could bootstrap through carrier publication. 2016-08-23 09:32:57.987 INFO com.couchbase.client.CouchbaseConnection: Added {QA sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>, #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:57.987 INFO com.couchbase.client.CouchbaseClient: CouchbaseConnectionFactory{bucket='beer-sample', nodes=[http://dev.nlpcaptcha.in:8091/pools, http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.988 INFO com.couchbase.client.CouchbaseClient: viewmode set to production mode javax.validation.ValidationException: At least one output port must be connected: median at com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1764) at com.datatorrent.stram.StramClient.<init>(StramClient.java:163) at com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:596) at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2056) at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3445) at com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:151) at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1900) at com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1462) From: Priyanka Gugale [mailto:priya...@datatorrent.com<mailto:priya...@datatorrent.com>] Sent: Tuesday, August 23, 2016 11:54 AM To: users@apex.apache.org<mailto:users@apex.apache.org> Subject: Re: connecting two operators There are two ways you can do this, first as Yogi suggested add a new operator to convert object to Number. Below is code example: CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class); ObjectToNumberConverter converter = dag.addOperator("converter", ObjectToNumberConverter.class); MedianOperator median = dag.addOperator("median", MedianOperator.class); dag.addStream("inputFormatter", inputOperator.outputPort, converter.in<http://converter.in>).setLocality(Locality.THREAD_LOCAL); dag.addStream("med", converter.out, median.data); //ObjectToNumberConverter Class code import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; public class ObjectToNumberConverter extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(ObjectToNumberConverter.class); public final transient DefaultOutputPort<Number> out = new DefaultOutputPort<>(); public final transient DefaultInputPort<Object> in = new DefaultInputPort<Object>() { @Override public void process(Object tuple) { if (tuple instanceof Number) { Number number = (Number)tuple; out.emit(number); } else { LOG.info("Error converting object to number. " + tuple.toString()); } } }; } Second way is you can extend/create new MedianOperator which accepts "Object" on it's input port and handles type conversion itself. This is how you can do it: package com.datatorrent.tutorial.csvparser; import java.util.ArrayList; import java.util.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; public class MedianOperator extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(MedianOperator.class); private ArrayList<Double> values; /** * Input data port that takes a number. */ public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>() { /** * Computes sum and count with each tuple */ @Override public void process(Object tuple) { if (tuple instanceof Number) { Number numTuple = (Number)tuple; values.add(numTuple.doubleValue()); } else { LOG.info("Invalid input format of tuple: " + tuple.toString()); } } }; /** * Output port that emits median of incoming data. */ public final transient DefaultOutputPort<Number> median = new DefaultOutputPort<Number>(); @Override public void beginWindow(long arg0) { values = new ArrayList<Double>(); } @Override public void endWindow() { if (values.size() == 0) { return; } if (values.size() == 1) { median.emit(values.get(0)); return; } // median value Collections.sort(values); int medianIndex = values.size() / 2; if (values.size() % 2 == 0) { Double value = values.get(medianIndex - 1); value = (value + values.get(medianIndex)) / 2; median.emit(value); } else { median.emit(values.get(medianIndex)); } } } On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra <devendra.vyavah...@gmail.com<mailto:devendra.vyavah...@gmail.com>> wrote: Define ObjectToNumberConverter operator which accepts Object on the input port and emits Number on its output port. Use this operator in your DAG. ~ Yogi On 22 August 2016 at 17:42, Hitesh Goyal <hitesh.go...@nlpcaptcha.com<mailto:hitesh.go...@nlpcaptcha.com>> wrote: Hi team, Refer to following 3 lines of code. CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class); MedianOperator median = dag.addOperator("median", MedianOperator.class); dag.addStream("med", inputOperator.outputPort.getClass(), median.data); It is giving error in the last line that the method is not applicable as these arguments. How should I convert Object to Number so that I can input the data to Median Operator Class Regards, Hitesh Goyal Simpli5d Technologies Cont No.: 9599803307