There are two ways you can do this, first as Yogi suggested add a new operator to convert object to Number. Below is code example:
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class); ObjectToNumberConverter converter = dag.addOperator("converter", ObjectToNumberConverter.class); MedianOperator median = dag.addOperator("median", MedianOperator.class); dag.addStream("inputFormatter", inputOperator.outputPort, converter.in ).setLocality(Locality.THREAD_LOCAL); dag.addStream("med", converter.out, median.data); //ObjectToNumberConverter Class code import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; public class ObjectToNumberConverter extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(ObjectToNumberConverter.class); public final transient DefaultOutputPort<Number> out = new DefaultOutputPort<>(); public final transient DefaultInputPort<Object> in = new DefaultInputPort<Object>() { @Override public void process(Object tuple) { if (tuple instanceof Number) { Number number = (Number)tuple; out.emit(number); } else { LOG.info("Error converting object to number. " + tuple.toString()); } } }; } Second way is you can extend/create new MedianOperator which accepts "Object" on it's input port and handles type conversion itself. This is how you can do it: package com.datatorrent.tutorial.csvparser; import java.util.ArrayList; import java.util.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; public class MedianOperator extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(MedianOperator.class); private ArrayList<Double> values; /** * Input data port that takes a number. */ public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>() { /** * Computes sum and count with each tuple */ @Override public void process(Object tuple) { if (tuple instanceof Number) { Number numTuple = (Number)tuple; values.add(numTuple.doubleValue()); } else { LOG.info("Invalid input format of tuple: " + tuple.toString()); } } }; /** * Output port that emits median of incoming data. */ public final transient DefaultOutputPort<Number> median = new DefaultOutputPort<Number>(); @Override public void beginWindow(long arg0) { values = new ArrayList<Double>(); } @Override public void endWindow() { if (values.size() == 0) { return; } if (values.size() == 1) { median.emit(values.get(0)); return; } // median value Collections.sort(values); int medianIndex = values.size() / 2; if (values.size() % 2 == 0) { Double value = values.get(medianIndex - 1); value = (value + values.get(medianIndex)) / 2; median.emit(value); } else { median.emit(values.get(medianIndex)); } } } On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra <devendra.vyavah...@gmail.com > wrote: > Define ObjectToNumberConverter operator which accepts Object on the input > port and emits Number on its output port. > > Use this operator in your DAG. > > ~ Yogi > > On 22 August 2016 at 17:42, Hitesh Goyal <hitesh.go...@nlpcaptcha.com> > wrote: > >> Hi team, >> >> Refer to following 3 lines of code. >> >> CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", >> CouchBasePOJOInputOperator.*class*); >> >> MedianOperator median = dag.addOperator("median", MedianOperator.*class* >> ); >> >> dag.*addStream*("med", inputOperator.outputPort.getClass(), median.data); >> >> >> >> It is giving error in the last line that the method is not applicable >> as these arguments. >> >> How should I convert Object to Number so that I can input the data to >> Median Operator Class >> >> >> >> >> >> Regards, >> >> *Hitesh Goyal* >> >> Simpli5d Technologies >> >> Cont No.: 9599803307 >> >> >> > >