Hi team, I want all the tuples emitted by MedianOperator, MeanOperator, StandarDeviationOperator in a single file(i.e. a java file) so that I can process these accordingly. How can I do that?
From: Priyanka Gugale [mailto:[email protected]] Sent: Monday, August 29, 2016 4:33 PM To: [email protected] Subject: Re: Connecting multiple operators I suspect there is not enough memory to launch the operators. As per the code, we will need 4 containers, may be your cluster doesn't have enough resources. Let's try to set low memory for operators, anyway we are not storing much in memory. You can configure memory using setting: <property> <name>dt.application.*.operator.*.attr.MEMORY_MB</name> <value>256</value> </property> Refer troubleshooting guide to know more: http://docs.datatorrent.com/troubleshooting/#configuring-memory Also check on UI for number of requested vs allocated containers, and check the hadoop memory settings. -Priyanka On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <[email protected]<mailto:[email protected]>> wrote: Please find the code below. Also find the Application Logs as an attached file. MeanOperetor.java package com.example.myapexapp; import java.util.ArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; public class MeanOperator extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class); private ArrayList<Double> values; /** * Input data port that takes a number. */ public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() { /** * Computes sum and count with each tuple */ @Override public void process(Object tuple) { if (tuple instanceof DbData) { DbData dataTuple = (DbData) tuple; values.add(dataTuple.getAbv()); } else { LOG.info("Invalid input format of tuple: " + tuple.toString()); } } }; /** * Output port that emits median of incoming data. */ public final transient DefaultOutputPort<Number> mean = new DefaultOutputPort<Number>(); @Override public void beginWindow(long arg0) { values = new ArrayList<Double>(); } @Override public void endWindow() { if (values.size() == 0) { return; } if (values.size() == 1) { mean.emit(values.get(0)); return; } double sum=0; for (Double value : values) { sum += value; } mean.emit(sum/values.size()); } } StandardDeviationOperator.java package com.example.myapexapp; import java.util.ArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; public class StandardDeviationOperator extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(MeanOperator.class); private ArrayList<Double> values; /** * Input data port that takes a number. */ public final transient DefaultInputPort<Object> meandata = new DefaultInputPort<Object>() { /** * Computes sum and count with each tuple */ @Override public void process(Object tuple) { if (tuple instanceof DbData) { DbData dataTuple = (DbData) tuple; values.add(dataTuple.getAbv()); } else { LOG.info("Invalid input format of tuple: " + tuple.toString()); } } }; /** * Output port that emits median of incoming data. */ public final transient DefaultOutputPort<Number> deviation = new DefaultOutputPort<Number>(); @Override public void beginWindow(long arg0) { values = new ArrayList<Double>(); } @Override public void endWindow() { if (values.size() == 0) { return; } if (values.size() == 1) { deviation.emit(values.get(0)); return; } double sum=0,meanvalue=0,temp=0; for (int i=0;i<values.size();i++) { sum += values.get(i); } meanvalue=sum/values.size(); for(int i=0;i<values.size();i++){ Double val = values.get(i); double squrDiffToMean = Math.pow(val - meanvalue, 2); temp += squrDiffToMean; } double meanOfDiffs = (double) temp / (double) (values.size()); deviation.emit(Math.sqrt(meanOfDiffs)); } } From: Priyanka Gugale [mailto:[email protected]<mailto:[email protected]>] Sent: Monday, August 29, 2016 2:09 PM To: [email protected]<mailto:[email protected]> Subject: Re: Connecting multiple operators Hitesh, Can you please share code of your MeanOperator and StandardDeviationOperator. Also please share the application logs. After shutting down application you can run "yan logs -applicationId <appId> to collect logs. -Priyanka On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal <[email protected]<mailto:[email protected]>> wrote: Hi team, I am trying to process some data using Operators. @SuppressWarnings("unchecked") @Override public void populateDAG(DAG dag, Configuration conf) { System.setProperty("viewmode", "production"); CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class); inputOperator.setStore(new CouchBaseStore()); MedOperator med = dag.addOperator("median", MedOperator.class); MeanOperator mean=dag.addOperator("mean", MeanOperator.class); StandardDeviationOperator sdo=dag.addOperator("sdo", StandardDeviationOperator.class); ConsoleOutputOperator cons = dag.addOperator("cons", new ConsoleOutputOperator()); ConsoleOutputOperator cons1 = dag.addOperator("cons1", new ConsoleOutputOperator()); ConsoleOutputOperator cons2 = dag.addOperator("cons2", new ConsoleOutputOperator()); dag.addStream("inputFormatter", inputOperator.outputPort, med.data,mean.meandata,sdo.meandata); dag.addStream("cons", med.median, cons.input).setLocality(Locality.THREAD_LOCAL); dag.addStream("cons1", mean.mean, cons1.input).setLocality(Locality.THREAD_LOCAL); dag.addStream("cons2", sdo.deviation, cons2.input).setLocality(Locality.THREAD_LOCAL); } There is no error in the code but when I launch this application in Data Torrent, the status of operators remains pending instead of Running/Active. Physical DAG view is clearly showing right connection of one stream to another. Regards, Hitesh Goyal Simpli5d Technologies Cont No.: 9599803307
