Thanks, this looks better

    // parse the lines of data into sensor objects
    val sensorDStream = ssc.textFileStream("/stream").
      map(Sensor.parseSensor)

    sensorDStream.foreachRDD { rdd =>

      // filter sensor data for low psi
      val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)

      // convert sensor data to put object and write to HBase
      rdd.map(Sensor.convertToPut).
        saveAsHadoopDataset(jobConfig)

      // convert alert data to put object and write to HBase CF alert
      rdd.map(Sensor.convertToPutAlert).
        saveAsHadoopDataset(jobConfig)
    }
    // Start the computation
    ssc.start()

On Fri, Aug 28, 2015 at 10:59 AM, Sean Owen <so...@cloudera.com> wrote:

> Yes, for example "val sensorRDD = rdd.map(Sensor.parseSensor)" is a
> line of code executed on the driver; it's part the function you
> supplied to foreachRDD. However that line defines an operation on an
> RDD, and the map function you supplied (parseSensor) will ultimately
> be carried out on the cluster.
>
> If you mean, is the bulk of the work (the Sensor.*) methods happening
> on the cluster? yes.
>
> Ewan's version looks cleaner, though it will ultimately be equivalent
> and doesn't cause operations to happen in a different place.
>
> (PS I don't think you need "new PairRDDFunctions"; the implicits it
> defines should be automatically available.
> "sensorRDD.map(Sensor.convertToPut)" should be sufficient. In slightly
> older versions of Spark you have to import SparkContext._ to get these
> implicits.)
>
> On Fri, Aug 28, 2015 at 3:29 PM, Carol McDonald <cmcdon...@maprtech.com>
> wrote:
> > I would like to make sure  that I am using the DStream  foreachRDD
> operation
> > correctly. I would like to read from a DStream transform the input and
> write
> > to HBase.  The code below works , but I became confused when I read "Note
> > that the function func is executed in the driver process" ?
> >
> >
> >     val lines = ssc.textFileStream("/stream")
> >
> >     lines.foreachRDD { rdd =>
> >       // parse the line of data into sensor object
> >       val sensorRDD = rdd.map(Sensor.parseSensor)
> >
> >       // convert sensor data to put object and write to HBase table
> column
> > family data
> >       new PairRDDFunctions(sensorRDD.
> >                   map(Sensor.convertToPut)).
> >                   saveAsHadoopDataset(jobConfig)
> >
> >     }
>

Reply via email to