Perfect, that's exactly what I was looking for. Thank you!
On Mon, Dec 15, 2014 at 3:32 AM, Yanbo Liang <yanboha...@gmail.com> wrote: > > Hi Nathan, > > #1 > > Spark SQL & DSL can satisfy your requirement. You can refer the following > code snippet: > > jdata.select(Star(Node), 'seven.getField("mod"), 'eleven.getField("mod")) > > You need to import org.apache.spark.sql.catalyst.analysis.Star in advance. > > #2 > > After you make the transform above, you do not need to make SchemaRDD > manually. > Because that jdata.select() return a SchemaRDD and you can operate on it > directly. > > For example, the following code snippet will return a new SchemaRDD with > longer Row: > > val t1 = jdata.select(Star(Node), 'seven.getField("mod") + > 'eleven.getField("mod") as 'mod_sum) > > You can use t1.printSchema() to print the schema of this SchemaRDD and > check whether it satisfy your requirements. > > > > 2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld <nkronenf...@oculusinfo.com>: >> >> (1) I understand about immutability, that's why I said I wanted a new >> SchemaRDD. >> (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, >> and results in a new SchemaRDD with one new function. >> (3) The DSL stuff is a big clue, but I can't find adequate documentation >> for it >> >> What I'm looking for is something like: >> >> import org.apache.spark.sql._ >> >> >> val sqlc = new SQLContext(sc) >> import sqlc._ >> >> >> val data = sc.parallelize(0 to 99).map(n => >> ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+ >> "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n >> % 11, n * 11)) >> val jdata = sqlc.jsonRDD(data) >> jdata.registerTempTable("jdata") >> >> >> val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum >> FROM jdata") >> >> >> This sqlVersion works fine, but if I try to do the same thing with a >> programatic function, I'm missing a bunch of pieces: >> >> - I assume I'd need to start with something like: >> jdata.select('*, 'seven.mod, 'eleven.mod) >> and then get and process the last two elements. The problems are: >> - I can't select '* - there seems no way to get the complete row >> - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation >> seems only one deep. >> - Assuming I could do that, I don't see a way to make the result into >> a SchemaRDD. I assume I would have to do something like: >> 1. take my row and value, and create a new, slightly longer row >> 2. take my old schema, and create a new schema with one more field >> at the end, named and typed appropriately >> 3. combine the two into a SchemaRDD >> I think I see how to do 3, but 1 and 2 elude me. >> >> Is there more complete documentation somewhere for the DSL portion? >> Anyone have a clue about any of the above? >> >> >> >> On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <yanboha...@gmail.com> >> wrote: >> >>> RDD is immutable so you can not modify it. >>> If you want to modify some value or schema in RDD, using map to >>> generate a new RDD. >>> The following code for your reference: >>> >>> def add(a:Int,b:Int):Int = { >>> a + b >>> } >>> >>> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) } >>> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))} >>> d2.foreach(println) >>> >>> >>> Otherwise, if your self-defining function is straightforward and you can >>> represent it by SQL, using Spark SQL or DSL is also a good choice. >>> >>> case class Person(id: Int, score: Int, value: Int) >>> >>> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>> >>> import sqlContext._ >>> >>> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)} >>> val d2 = d1.select('id, 'score, 'id + 'score) >>> d2.foreach(println) >>> >>> >>> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nkronenf...@oculusinfo.com >>> >: >>> >>>> Hi, there. >>>> >>>> I'm trying to understand how to augment data in a SchemaRDD. >>>> >>>> I can see how to do it if can express the added values in SQL - just >>>> run "SELECT *,valueCalculation AS newColumnName FROM table" >>>> >>>> I've been searching all over for how to do this if my added value is a >>>> scala function, with no luck. >>>> >>>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to >>>> add a new column, D, calculated using Utility.process(b, c), and I want (of >>>> course) to pass in the value B and C from each row, ending up with a new >>>> SchemaRDD with columns A, B, C, and D. >>>> >>>> Is this possible? If so, how? >>>> >>>> Thanks, >>>> -Nathan >>>> >>>> -- >>>> Nathan Kronenfeld >>>> Senior Visualization Developer >>>> Oculus Info Inc >>>> 2 Berkeley Street, Suite 600, >>>> Toronto, Ontario M5A 4J5 >>>> Phone: +1-416-203-3003 x 238 >>>> Email: nkronenf...@oculusinfo.com >>>> >>> >>> >> >> >> -- >> Nathan Kronenfeld >> Senior Visualization Developer >> Oculus Info Inc >> 2 Berkeley Street, Suite 600, >> Toronto, Ontario M5A 4J5 >> Phone: +1-416-203-3003 x 238 >> Email: nkronenf...@oculusinfo.com >> > -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com