I suspect there is not enough memory to launch the operators. As per the code, we will need 4 containers, may be your cluster doesn't have enough resources. Let's try to set low memory for operators, anyway we are not storing much in memory. You can configure memory using setting:
<property> <name>dt.application.*.operator.*.attr.MEMORY_MB</name> <value>256</value> </property> Refer troubleshooting guide to know more: http://docs.datatorrent.com/troubleshooting/#configuring-memory Also check on UI for number of requested vs allocated containers, and check the hadoop memory settings. -Priyanka On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <[email protected]> wrote: > Please find the code below. Also find the Application Logs as an > attached file. > > > > MeanOperetor.java > > > > package com.example.myapexapp; > > > > import java.util.ArrayList; > > > > > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > > > > import com.datatorrent.api.DefaultInputPort; > > import com.datatorrent.api.DefaultOutputPort; > > import com.datatorrent.common.util.BaseOperator; > > > > public class MeanOperator extends BaseOperator { > > private static final Logger LOG = LoggerFactory.getLogger( > MeanOperator.class); > > private ArrayList<Double> values; > > > > /** > > * Input data port that takes a number. > > */ > > public final transient DefaultInputPort<Object> meandata = > new DefaultInputPort<Object>() { > > /** > > * Computes sum and count with each tuple > > */ > > @Override > > public void process(Object tuple) { > > if (tuple instanceof > DbData) { > > DbData > dataTuple = (DbData) tuple; > > > values.add(dataTuple.getAbv()); > > } else { > > > LOG.info("Invalid input format of tuple: " + tuple.toString()); > > } > > > > } > > }; > > > > /** > > * Output port that emits median of incoming data. > > */ > > public final transient DefaultOutputPort<Number> mean = > new DefaultOutputPort<Number>(); > > > > @Override > > public void beginWindow(long arg0) { > > values = new ArrayList<Double>(); > > } > > @Override > > public void endWindow() { > > if (values.size() == 0) { > > return; > > } > > if (values.size() == 1) { > > mean.emit(values.get(0)); > > return; > > } > > double sum=0; > > for (Double value : > values) { > > sum += value; > > } > > > mean.emit(sum/values.size()); > > > > } > > > > } > > > > > > > > StandardDeviationOperator.java > > > > > > *package* com.example.myapexapp; > > > > *import* java.util.ArrayList; > > > > *import* org.slf4j.Logger; > > *import* org.slf4j.LoggerFactory; > > > > *import* com.datatorrent.api.DefaultInputPort; > > *import* com.datatorrent.api.DefaultOutputPort; > > *import* com.datatorrent.common.util.BaseOperator; > > > > *public* *class* StandardDeviationOperator *extends* BaseOperator { > > *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger* > (MeanOperator.*class*); > > *private* ArrayList<Double> values; > > > > /** > > * Input data port that takes a number. > > */ > > *public* *final* *transient* DefaultInputPort<Object> meandata = > *new* DefaultInputPort<Object>() { > > /** > > * Computes sum and count with each tuple > > */ > > @Override > > *public* *void* process(Object tuple) { > > *if* (tuple *instanceof* DbData) { > > DbData dataTuple = (DbData) tuple; > > values.add(dataTuple.getAbv()); > > } *else* { > > *LOG*.info("Invalid input format of tuple: " + > tuple.toString()); > > } > > > > } > > }; > > > > /** > > * Output port that emits median of incoming data. > > */ > > *public* *final* *transient* DefaultOutputPort<Number> deviation = > *new* DefaultOutputPort<Number>(); > > > > @Override > > *public* *void* beginWindow(*long* arg0) { > > values = *new* ArrayList<Double>(); > > } > > @Override > > *public* *void* endWindow() { > > *if* (values.size() == 0) { > > *return*; > > } > > *if* (values.size() == 1) { > > deviation.emit(values.get(0)); > > *return*; > > } > > *double* sum=0,meanvalue=0,temp=0; > > *for* (*int* i=0;i<values.size();i++) { > > sum += values.get(i); > > } > > meanvalue=sum/values.size(); > > *for*(*int* i=0;i<values.size();i++){ > > Double val = values.get(i); > > *double* squrDiffToMean = Math.*pow*(val - meanvalue, > 2); > > temp += squrDiffToMean; > > } > > *double* meanOfDiffs = (*double*) temp / (*double*) ( > values.size()); > > deviation.emit(Math.*sqrt*(meanOfDiffs)); > > > > > > > > } > > > > } > > > > > > > > > > > > > > > > > > *From:* Priyanka Gugale [mailto:[email protected]] > *Sent:* Monday, August 29, 2016 2:09 PM > *To:* [email protected] > *Subject:* Re: Connecting multiple operators > > > > Hitesh, > > > > Can you please share code of your MeanOperator and > StandardDeviationOperator. > > Also please share the application logs. After shutting down application > you can run "yan logs -applicationId <appId> to collect logs. > > > > -Priyanka > > > > On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal < > [email protected]> wrote: > > Hi team, > > > > I am trying to process some data using Operators. > > > > @SuppressWarnings("unchecked") > > @Override > > *public* *void* populateDAG(DAG dag, Configuration conf) { > > System.*setProperty*("viewmode", "production"); > > CouchBasePOJOInputOperator inputOperator = dag.addOperator(" > inputOperator", CouchBasePOJOInputOperator.*class*); > > inputOperator.setStore(*new* CouchBaseStore()); > > MedOperator med = dag.addOperator("median", MedOperator. > *class*); > > MeanOperator mean=dag.addOperator("mean", MeanOperator. > *class*); > > StandardDeviationOperator sdo=dag.addOperator("sdo", > StandardDeviationOperator.*class*); > > ConsoleOutputOperator cons = dag.addOperator("cons", *new* > ConsoleOutputOperator()); > > ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new* > ConsoleOutputOperator()); > > ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new* > ConsoleOutputOperator()); > > dag.addStream("inputFormatter", inputOperator.outputPort, > med.data,mean.meandata,sdo.meandata); > > dag.addStream("cons", med.median, cons.input).setLocality( > Locality.*THREAD_LOCAL*); > > dag.addStream("cons1", mean.mean, cons1.input).setLocality( > Locality.*THREAD_LOCAL*); > > dag.addStream("cons2", sdo.deviation, cons2.input > ).setLocality(Locality.*THREAD_LOCAL*); > > } > > There is no error in the code but when I launch this application in Data > Torrent, the status of operators remains pending instead of Running/Active. > Physical DAG view is clearly showing right connection of one stream to > another. > > > > > > Regards, > > *Hitesh Goyal* > > Simpli5d Technologies > > Cont No.: 9599803307 > > > > >
