There are two ways you can do this, first as Yogi suggested add a new
operator to convert object to Number. Below is code example:
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
CouchBasePOJOInputOperator.class);
ObjectToNumberConverter converter = dag.addOperator("converter",
ObjectToNumberConverter.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);
dag.addStream("inputFormatter", inputOperator.outputPort, converter.in
).setLocality(Locality.THREAD_LOCAL);
dag.addStream("med", converter.out, median.data);
//ObjectToNumberConverter Class code
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class ObjectToNumberConverter extends BaseOperator
{
private static final Logger LOG =
LoggerFactory.getLogger(ObjectToNumberConverter.class);
public final transient DefaultOutputPort<Number> out = new
DefaultOutputPort<>();
public final transient DefaultInputPort<Object> in = new
DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
if (tuple instanceof Number) {
Number number = (Number)tuple;
out.emit(number);
} else {
LOG.info("Error converting object to number. " + tuple.toString());
}
}
};
}
Second way is you can extend/create new MedianOperator which accepts
"Object" on it's input port and handles type conversion itself.
This is how you can do it:
package com.datatorrent.tutorial.csvparser;
import java.util.ArrayList;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class MedianOperator extends BaseOperator
{
private static final Logger LOG =
LoggerFactory.getLogger(MedianOperator.class);
private ArrayList<Double> values;
/**
* Input data port that takes a number.
*/
public final transient DefaultInputPort<Object> data = new
DefaultInputPort<Object>()
{
/**
* Computes sum and count with each tuple
*/
@Override
public void process(Object tuple)
{
if (tuple instanceof Number) {
Number numTuple = (Number)tuple;
values.add(numTuple.doubleValue());
} else {
LOG.info("Invalid input format of tuple: " + tuple.toString());
}
}
};
/**
* Output port that emits median of incoming data.
*/
public final transient DefaultOutputPort<Number> median = new
DefaultOutputPort<Number>();
@Override
public void beginWindow(long arg0)
{
values = new ArrayList<Double>();
}
@Override
public void endWindow()
{
if (values.size() == 0) {
return;
}
if (values.size() == 1) {
median.emit(values.get(0));
return;
}
// median value
Collections.sort(values);
int medianIndex = values.size() / 2;
if (values.size() % 2 == 0) {
Double value = values.get(medianIndex - 1);
value = (value + values.get(medianIndex)) / 2;
median.emit(value);
} else {
median.emit(values.get(medianIndex));
}
}
}
On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra <[email protected]
> wrote:
> Define ObjectToNumberConverter operator which accepts Object on the input
> port and emits Number on its output port.
>
> Use this operator in your DAG.
>
> ~ Yogi
>
> On 22 August 2016 at 17:42, Hitesh Goyal <[email protected]>
> wrote:
>
>> Hi team,
>>
>> Refer to following 3 lines of code.
>>
>> CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
>> CouchBasePOJOInputOperator.*class*);
>>
>> MedianOperator median = dag.addOperator("median", MedianOperator.*class*
>> );
>>
>> dag.*addStream*("med", inputOperator.outputPort.getClass(), median.data);
>>
>>
>>
>> It is giving error in the last line that the method is not applicable
>> as these arguments.
>>
>> How should I convert Object to Number so that I can input the data to
>> Median Operator Class
>>
>>
>>
>>
>>
>> Regards,
>>
>> *Hitesh Goyal*
>>
>> Simpli5d Technologies
>>
>> Cont No.: 9599803307
>>
>>
>>
>
>