Thanks, this looks better
// parse the lines of data into sensor objects val sensorDStream = ssc.textFileStream("/stream"). map(Sensor.parseSensor) sensorDStream.foreachRDD { rdd => // filter sensor data for low psi val alertRDD = rdd.filter(sensor => sensor.psi < 5.0) // convert sensor data to put object and write to HBase rdd.map(Sensor.convertToPut). saveAsHadoopDataset(jobConfig) // convert alert data to put object and write to HBase CF alert rdd.map(Sensor.convertToPutAlert). saveAsHadoopDataset(jobConfig) } // Start the computation ssc.start() On Fri, Aug 28, 2015 at 10:59 AM, Sean Owen <so...@cloudera.com> wrote: > Yes, for example "val sensorRDD = rdd.map(Sensor.parseSensor)" is a > line of code executed on the driver; it's part the function you > supplied to foreachRDD. However that line defines an operation on an > RDD, and the map function you supplied (parseSensor) will ultimately > be carried out on the cluster. > > If you mean, is the bulk of the work (the Sensor.*) methods happening > on the cluster? yes. > > Ewan's version looks cleaner, though it will ultimately be equivalent > and doesn't cause operations to happen in a different place. > > (PS I don't think you need "new PairRDDFunctions"; the implicits it > defines should be automatically available. > "sensorRDD.map(Sensor.convertToPut)" should be sufficient. In slightly > older versions of Spark you have to import SparkContext._ to get these > implicits.) > > On Fri, Aug 28, 2015 at 3:29 PM, Carol McDonald <cmcdon...@maprtech.com> > wrote: > > I would like to make sure that I am using the DStream foreachRDD > operation > > correctly. I would like to read from a DStream transform the input and > write > > to HBase. The code below works , but I became confused when I read "Note > > that the function func is executed in the driver process" ? > > > > > > val lines = ssc.textFileStream("/stream") > > > > lines.foreachRDD { rdd => > > // parse the line of data into sensor object > > val sensorRDD = rdd.map(Sensor.parseSensor) > > > > // convert sensor data to put object and write to HBase table > column > > family data > > new PairRDDFunctions(sensorRDD. > > map(Sensor.convertToPut)). > > saveAsHadoopDataset(jobConfig) > > > > } >