There are two ways you can do this, first as Yogi suggested add a new
operator to convert object to Number. Below is code example:

CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
CouchBasePOJOInputOperator.class);
ObjectToNumberConverter converter = dag.addOperator("converter",
ObjectToNumberConverter.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);

 dag.addStream("inputFormatter", inputOperator.outputPort, converter.in
).setLocality(Locality.THREAD_LOCAL);
 dag.addStream("med", converter.out, median.data);

//ObjectToNumberConverter Class code

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class ObjectToNumberConverter extends BaseOperator
{
  private static final Logger LOG =
LoggerFactory.getLogger(ObjectToNumberConverter.class);
  public final transient DefaultOutputPort<Number> out = new
DefaultOutputPort<>();
  public final transient   DefaultInputPort<Object> in = new
DefaultInputPort<Object>()
  {
    @Override
    public void process(Object tuple)
    {
      if (tuple instanceof Number) {
        Number number = (Number)tuple;
        out.emit(number);
      } else {
        LOG.info("Error converting object to number. " + tuple.toString());
      }
    }
  };

}


Second way is you can extend/create new MedianOperator which accepts
"Object" on it's input port and handles type conversion itself.
This is how you can do it:

package com.datatorrent.tutorial.csvparser;

import java.util.ArrayList;
import java.util.Collections;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;

public class MedianOperator extends BaseOperator
{
  private static final Logger LOG =
LoggerFactory.getLogger(MedianOperator.class);
  private ArrayList<Double> values;

  /**
   * Input data port that takes a number.
   */
  public final transient DefaultInputPort<Object> data = new
DefaultInputPort<Object>()
  {
    /**
     * Computes sum and count with each tuple
     */
    @Override
    public void process(Object tuple)
    {
      if (tuple instanceof Number) {
        Number numTuple = (Number)tuple;
        values.add(numTuple.doubleValue());
      } else {
        LOG.info("Invalid input format of tuple: " + tuple.toString());
      }

    }
  };

  /**
   * Output port that emits median of incoming data.
   */
  public final transient DefaultOutputPort<Number> median = new
DefaultOutputPort<Number>();

  @Override
  public void beginWindow(long arg0)
  {
    values = new ArrayList<Double>();
  }

  @Override
  public void endWindow()
  {
    if (values.size() == 0) {
      return;
    }
    if (values.size() == 1) {
      median.emit(values.get(0));
      return;
    }

    // median value
    Collections.sort(values);
    int medianIndex = values.size() / 2;
    if (values.size() % 2 == 0) {
      Double value = values.get(medianIndex - 1);
      value = (value + values.get(medianIndex)) / 2;
      median.emit(value);
    } else {
      median.emit(values.get(medianIndex));
    }
  }

}





On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra <devendra.vyavah...@gmail.com
> wrote:

> Define  ObjectToNumberConverter operator which accepts Object on the input
> port and emits Number on its output port.
>
> Use this operator in your DAG.
>
> ~ Yogi
>
> On 22 August 2016 at 17:42, Hitesh Goyal <hitesh.go...@nlpcaptcha.com>
> wrote:
>
>> Hi team,
>>
>>           Refer to following 3 lines of code.
>>
>> CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
>> CouchBasePOJOInputOperator.*class*);
>>
>> MedianOperator median = dag.addOperator("median", MedianOperator.*class*
>> );
>>
>> dag.*addStream*("med", inputOperator.outputPort.getClass(), median.data);
>>
>>
>>
>>     It is giving error in the last line that the method is not applicable
>> as these arguments.
>>
>>     How should I convert Object to Number so that I can input the data to
>> Median Operator Class
>>
>>
>>
>>
>>
>> Regards,
>>
>> *Hitesh Goyal*
>>
>> Simpli5d Technologies
>>
>> Cont No.: 9599803307
>>
>>
>>
>
>

Reply via email to