Hi Hitesh, Your Median, Mean and StandardDeviation operators can't directly write to one file. In general if you see how file systems work, only one process/thread can write to a file at same time. To achieve your usecase you can try following options:
1. Forward output from your three processing operators to one operator which will write data to File. 2. Write to different output files and merge the files to get final result. I would suggest you try first approach. -Priyanka On Tue, Aug 30, 2016 at 10:15 AM, Hitesh Goyal <[email protected]> wrote: > Hi team, > > > > I want all the tuples emitted by MedianOperator, MeanOperator, > StandarDeviationOperator in a single file(i.e. a java file) so that I can > process these accordingly. How can I do that? > > > > *From:* Priyanka Gugale [mailto:[email protected]] > *Sent:* Monday, August 29, 2016 4:33 PM > > *To:* [email protected] > *Subject:* Re: Connecting multiple operators > > > > I suspect there is not enough memory to launch the operators. As per the > code, we will need 4 containers, may be your cluster doesn't have enough > resources. Let's try to set low memory for operators, anyway we are not > storing much in memory. You can configure memory using setting: > > > > <property> > > <name>dt.application.*.operator.*.attr.MEMORY_MB</name> > > <value>256</value> > > </property> > > > > Refer troubleshooting guide to know more: http://docs.datatorrent. > com/troubleshooting/#configuring-memory > > > > Also check on UI for number of requested vs allocated containers, and > check the hadoop memory settings. > > > > -Priyanka > > > > > > > > On Mon, Aug 29, 2016 at 2:32 PM, Hitesh Goyal <[email protected]> > wrote: > > Please find the code below. Also find the Application Logs as an > attached file. > > > > MeanOperetor.java > > > > package com.example.myapexapp; > > > > import java.util.ArrayList; > > > > > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > > > > import com.datatorrent.api.DefaultInputPort; > > import com.datatorrent.api.DefaultOutputPort; > > import com.datatorrent.common.util.BaseOperator; > > > > public class MeanOperator extends BaseOperator { > > private static final Logger LOG = LoggerFactory.getLogger( > MeanOperator.class); > > private ArrayList<Double> values; > > > > /** > > * Input data port that takes a number. > > */ > > public final transient DefaultInputPort<Object> meandata = > new DefaultInputPort<Object>() { > > /** > > * Computes sum and count with each tuple > > */ > > @Override > > public void process(Object tuple) { > > if (tuple instanceof > DbData) { > > DbData > dataTuple = (DbData) tuple; > > > values.add(dataTuple.getAbv()); > > } else { > > > LOG.info("Invalid input format of tuple: " + tuple.toString()); > > } > > > > } > > }; > > > > /** > > * Output port that emits median of incoming data. > > */ > > public final transient DefaultOutputPort<Number> mean = > new DefaultOutputPort<Number>(); > > > > @Override > > public void beginWindow(long arg0) { > > values = new ArrayList<Double>(); > > } > > @Override > > public void endWindow() { > > if (values.size() == 0) { > > return; > > } > > if (values.size() == 1) { > > mean.emit(values.get(0)); > > return; > > } > > double sum=0; > > for (Double value : > values) { > > sum += value; > > } > > > mean.emit(sum/values.size()); > > > > } > > > > } > > > > > > > > StandardDeviationOperator.java > > > > > > *package* com.example.myapexapp; > > > > *import* java.util.ArrayList; > > > > *import* org.slf4j.Logger; > > *import* org.slf4j.LoggerFactory; > > > > *import* com.datatorrent.api.DefaultInputPort; > > *import* com.datatorrent.api.DefaultOutputPort; > > *import* com.datatorrent.common.util.BaseOperator; > > > > *public* *class* StandardDeviationOperator *extends* BaseOperator { > > *private* *static* *final* Logger *LOG* = LoggerFactory.*getLogger* > (MeanOperator.*class*); > > *private* ArrayList<Double> values; > > > > /** > > * Input data port that takes a number. > > */ > > *public* *final* *transient* DefaultInputPort<Object> meandata = > *new* DefaultInputPort<Object>() { > > /** > > * Computes sum and count with each tuple > > */ > > @Override > > *public* *void* process(Object tuple) { > > *if* (tuple *instanceof* DbData) { > > DbData dataTuple = (DbData) tuple; > > values.add(dataTuple.getAbv()); > > } *else* { > > *LOG*.info("Invalid input format of tuple: " + > tuple.toString()); > > } > > > > } > > }; > > > > /** > > * Output port that emits median of incoming data. > > */ > > *public* *final* *transient* DefaultOutputPort<Number> deviation = > *new* DefaultOutputPort<Number>(); > > > > @Override > > *public* *void* beginWindow(*long* arg0) { > > values = *new* ArrayList<Double>(); > > } > > @Override > > *public* *void* endWindow() { > > *if* (values.size() == 0) { > > *return*; > > } > > *if* (values.size() == 1) { > > deviation.emit(values.get(0)); > > *return*; > > } > > *double* sum=0,meanvalue=0,temp=0; > > *for* (*int* i=0;i<values.size();i++) { > > sum += values.get(i); > > } > > meanvalue=sum/values.size(); > > *for*(*int* i=0;i<values.size();i++){ > > Double val = values.get(i); > > *double* squrDiffToMean = Math.*pow*(val - meanvalue, > 2); > > temp += squrDiffToMean; > > } > > *double* meanOfDiffs = (*double*) temp / (*double*) ( > values.size()); > > deviation.emit(Math.*sqrt*(meanOfDiffs)); > > > > > > > > } > > > > } > > > > > > > > > > > > > > > > > > *From:* Priyanka Gugale [mailto:[email protected]] > *Sent:* Monday, August 29, 2016 2:09 PM > *To:* [email protected] > *Subject:* Re: Connecting multiple operators > > > > Hitesh, > > > > Can you please share code of your MeanOperator and > StandardDeviationOperator. > > Also please share the application logs. After shutting down application > you can run "yan logs -applicationId <appId> to collect logs. > > > > -Priyanka > > > > On Mon, Aug 29, 2016 at 10:39 AM, Hitesh Goyal < > [email protected]> wrote: > > Hi team, > > > > I am trying to process some data using Operators. > > > > @SuppressWarnings("unchecked") > > @Override > > *public* *void* populateDAG(DAG dag, Configuration conf) { > > System.*setProperty*("viewmode", "production"); > > CouchBasePOJOInputOperator inputOperator = dag.addOperator(" > inputOperator", CouchBasePOJOInputOperator.*class*); > > inputOperator.setStore(*new* CouchBaseStore()); > > MedOperator med = dag.addOperator("median", MedOperator. > *class*); > > MeanOperator mean=dag.addOperator("mean", MeanOperator. > *class*); > > StandardDeviationOperator sdo=dag.addOperator("sdo", > StandardDeviationOperator.*class*); > > ConsoleOutputOperator cons = dag.addOperator("cons", *new* > ConsoleOutputOperator()); > > ConsoleOutputOperator cons1 = dag.addOperator("cons1", *new* > ConsoleOutputOperator()); > > ConsoleOutputOperator cons2 = dag.addOperator("cons2", *new* > ConsoleOutputOperator()); > > dag.addStream("inputFormatter", inputOperator.outputPort, > med.data,mean.meandata,sdo.meandata); > > dag.addStream("cons", med.median, cons.input).setLocality( > Locality.*THREAD_LOCAL*); > > dag.addStream("cons1", mean.mean, cons1.input).setLocality( > Locality.*THREAD_LOCAL*); > > dag.addStream("cons2", sdo.deviation, cons2.input > ).setLocality(Locality.*THREAD_LOCAL*); > > } > > There is no error in the code but when I launch this application in Data > Torrent, the status of operators remains pending instead of Running/Active. > Physical DAG view is clearly showing right connection of one stream to > another. > > > > > > Regards, > > *Hitesh Goyal* > > Simpli5d Technologies > > Cont No.: 9599803307 > > > > > > >
