Hi. I am using the first method as u suggested. But it is giving an error that in the following line:- ObjectToNumberConverter converter = dag.addOperator("converter", ObjectToNumberConverter.class);
It is asking for importing the package of the class but it is in the same package.. Error while launching the application is defined below:- An error occurred trying to launch the application. Server message: 2016-08-23 09:32:54.756 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:54.923 INFO com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could bootstrap through carrier publication. 2016-08-23 09:32:54.927 INFO com.couchbase.client.CouchbaseConnection: Added {QA sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:54.928 INFO com.couchbase.client.CouchbaseClient: CouchbaseConnectionFactory{bucket='beer-sample', nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:54.958 INFO com.couchbase.client.CouchbaseClient: viewmode set to production mode 2016-08-23 09:32:55.504 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:55.539 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:55.664 INFO com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could bootstrap through carrier publication. 2016-08-23 09:32:55.665 INFO com.couchbase.client.CouchbaseConnection: Added {QA sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:55.665 INFO com.couchbase.client.CouchbaseClient: CouchbaseConnectionFactory{bucket='beer-sample', nodes=[http://dev.nlpcaptcha.in:8091/pools, http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:55.666 INFO com.couchbase.client.CouchbaseClient: viewmode set to production mode 2016-08-23 09:32:56.245 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.133 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.260 INFO com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could bootstrap through carrier publication. 2016-08-23 09:32:57.264 INFO com.couchbase.client.CouchbaseConnection: Added {QA sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:57.266 INFO com.couchbase.client.CouchbaseClient: CouchbaseConnectionFactory{bucket='beer-sample', nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.285 INFO com.couchbase.client.CouchbaseClient: viewmode set to production mode 2016-08-23 09:32:57.842 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.863 INFO net.spy.memcached.auth.AuthThread: Authenticated to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.986 INFO com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could bootstrap through carrier publication. 2016-08-23 09:32:57.987 INFO com.couchbase.client.CouchbaseConnection: Added {QA sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:57.987 INFO com.couchbase.client.CouchbaseClient: CouchbaseConnectionFactory{bucket='beer-sample', nodes=[http://dev.nlpcaptcha.in:8091/pools, http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000, opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500, obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1, configCheck=10, reconnectInt=1100, failureMode=Redistribute, hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.988 INFO com.couchbase.client.CouchbaseClient: viewmode set to production mode javax.validation.ValidationException: At least one output port must be connected: median at com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1764) at com.datatorrent.stram.StramClient.<init>(StramClient.java:163) at com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:596) at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2056) at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3445) at com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:151) at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1900) at com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1462) From: Priyanka Gugale [mailto:priya...@datatorrent.com] Sent: Tuesday, August 23, 2016 11:54 AM To: users@apex.apache.org Subject: Re: connecting two operators There are two ways you can do this, first as Yogi suggested add a new operator to convert object to Number. Below is code example: CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class); ObjectToNumberConverter converter = dag.addOperator("converter", ObjectToNumberConverter.class); MedianOperator median = dag.addOperator("median", MedianOperator.class); dag.addStream("inputFormatter", inputOperator.outputPort, converter.in<http://converter.in>).setLocality(Locality.THREAD_LOCAL); dag.addStream("med", converter.out, median.data); //ObjectToNumberConverter Class code import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; public class ObjectToNumberConverter extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(ObjectToNumberConverter.class); public final transient DefaultOutputPort<Number> out = new DefaultOutputPort<>(); public final transient DefaultInputPort<Object> in = new DefaultInputPort<Object>() { @Override public void process(Object tuple) { if (tuple instanceof Number) { Number number = (Number)tuple; out.emit(number); } else { LOG.info("Error converting object to number. " + tuple.toString()); } } }; } Second way is you can extend/create new MedianOperator which accepts "Object" on it's input port and handles type conversion itself. This is how you can do it: package com.datatorrent.tutorial.csvparser; import java.util.ArrayList; import java.util.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; public class MedianOperator extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(MedianOperator.class); private ArrayList<Double> values; /** * Input data port that takes a number. */ public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>() { /** * Computes sum and count with each tuple */ @Override public void process(Object tuple) { if (tuple instanceof Number) { Number numTuple = (Number)tuple; values.add(numTuple.doubleValue()); } else { LOG.info("Invalid input format of tuple: " + tuple.toString()); } } }; /** * Output port that emits median of incoming data. */ public final transient DefaultOutputPort<Number> median = new DefaultOutputPort<Number>(); @Override public void beginWindow(long arg0) { values = new ArrayList<Double>(); } @Override public void endWindow() { if (values.size() == 0) { return; } if (values.size() == 1) { median.emit(values.get(0)); return; } // median value Collections.sort(values); int medianIndex = values.size() / 2; if (values.size() % 2 == 0) { Double value = values.get(medianIndex - 1); value = (value + values.get(medianIndex)) / 2; median.emit(value); } else { median.emit(values.get(medianIndex)); } } } On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra <devendra.vyavah...@gmail.com<mailto:devendra.vyavah...@gmail.com>> wrote: Define ObjectToNumberConverter operator which accepts Object on the input port and emits Number on its output port. Use this operator in your DAG. ~ Yogi On 22 August 2016 at 17:42, Hitesh Goyal <hitesh.go...@nlpcaptcha.com<mailto:hitesh.go...@nlpcaptcha.com>> wrote: Hi team, Refer to following 3 lines of code. CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class); MedianOperator median = dag.addOperator("median", MedianOperator.class); dag.addStream("med", inputOperator.outputPort.getClass(), median.data); It is giving error in the last line that the method is not applicable as these arguments. How should I convert Object to Number so that I can input the data to Median Operator Class Regards, Hitesh Goyal Simpli5d Technologies Cont No.: 9599803307