Re: How to use collections inside foreach block
Hi Rishi and Ted, Thank you for the response. Now I'm using Accumulators and getting results. I have a another query, how to start parallel the code. Example :- var listOfIds is a ListBuffer with 2 records I'm creating batches. For each batch size is 500. It means, total batches are : 40. listOfIds.grouped(500).foreach { x => { val r = sc.parallelize(x).toDF() r.registerTempTable("r") val acc = sc.accumulator(0, "My Accumulator") var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = t.id ") result.foreach{ y => { acc += y } } acc.value.foreach(f => // saveing values to other db) } Above code is working in sequence. I want to run these 40 batches in parallel. *How to start these 40 bathes in parallel instead of sequence. * Could you please help me to resolve this use case. Regards, Rajesh On Wed, Dec 9, 2015 at 4:46 PM, Ted Yu wrote: > To add onto what Rishi said, you can use foreachPartition() on result > where you can save values to DB. > > Cheers > > On Wed, Dec 9, 2015 at 12:51 AM, Rishi Mishra > wrote: > >> Your list is defined on the driver, whereas function specified in forEach >> will be evaluated on each executor. >> You might want to add an accumulator or handle a Sequence of list from >> each partition. >> >> On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar < >> mrajaf...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a below query. Please help me to solve this >>> >>> I have a 2 ids. I want to join these ids to table. This table >>> contains some blob data. So i can not join these 2000 ids to this table in >>> one step. >>> >>> I'm planning to join this table in a chunks. For example, each step I >>> will join 5000 ids. >>> >>> Below code is not working. I'm not able to add result to ListBuffer. >>> Result s giving always ZERO >>> >>> *Code Block :-* >>> >>> var listOfIds is a ListBuffer with 2 records >>> >>> listOfIds.grouped(5000).foreach { x => >>> { >>> var v1 = new ListBuffer[String]() >>> val r = sc.parallelize(x).toDF() >>> r.registerTempTable("r") >>> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = >>> t.id") >>> result.foreach{ y => >>> { >>> v1 += y >>> } >>> } >>> println(" SIZE OF V1 === "+ v1.size) ==> >>> >>> *THIS VALUE PRINTING AS ZERO* >>> >>> *// Save v1 values to other db* >>> } >>> >>> Please help me on this. >>> >>> Regards, >>> Rajesh >>> >> >> >> >> -- >> Regards, >> Rishitesh Mishra, >> SnappyData . (http://www.snappydata.io/) >> >> https://in.linkedin.com/in/rishiteshmishra >> > >
Re: How to use collections inside foreach block
To add onto what Rishi said, you can use foreachPartition() on result where you can save values to DB. Cheers On Wed, Dec 9, 2015 at 12:51 AM, Rishi Mishra wrote: > Your list is defined on the driver, whereas function specified in forEach > will be evaluated on each executor. > You might want to add an accumulator or handle a Sequence of list from > each partition. > > On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar < > mrajaf...@gmail.com> wrote: > >> Hi, >> >> I have a below query. Please help me to solve this >> >> I have a 2 ids. I want to join these ids to table. This table >> contains some blob data. So i can not join these 2000 ids to this table in >> one step. >> >> I'm planning to join this table in a chunks. For example, each step I >> will join 5000 ids. >> >> Below code is not working. I'm not able to add result to ListBuffer. >> Result s giving always ZERO >> >> *Code Block :-* >> >> var listOfIds is a ListBuffer with 2 records >> >> listOfIds.grouped(5000).foreach { x => >> { >> var v1 = new ListBuffer[String]() >> val r = sc.parallelize(x).toDF() >> r.registerTempTable("r") >> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = >> t.id") >> result.foreach{ y => >> { >> v1 += y >> } >> } >> println(" SIZE OF V1 === "+ v1.size) ==> >> >> *THIS VALUE PRINTING AS ZERO* >> >> *// Save v1 values to other db* >> } >> >> Please help me on this. >> >> Regards, >> Rajesh >> > > > > -- > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > https://in.linkedin.com/in/rishiteshmishra >
Re: How to use collections inside foreach block
Your list is defined on the driver, whereas function specified in forEach will be evaluated on each executor. You might want to add an accumulator or handle a Sequence of list from each partition. On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > I have a below query. Please help me to solve this > > I have a 2 ids. I want to join these ids to table. This table contains > some blob data. So i can not join these 2000 ids to this table in one step. > > I'm planning to join this table in a chunks. For example, each step I will > join 5000 ids. > > Below code is not working. I'm not able to add result to ListBuffer. > Result s giving always ZERO > > *Code Block :-* > > var listOfIds is a ListBuffer with 2 records > > listOfIds.grouped(5000).foreach { x => > { > var v1 = new ListBuffer[String]() > val r = sc.parallelize(x).toDF() > r.registerTempTable("r") > var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = > t.id") > result.foreach{ y => > { > v1 += y > } > } > println(" SIZE OF V1 === "+ v1.size) ==> > > *THIS VALUE PRINTING AS ZERO* > > *// Save v1 values to other db* > } > > Please help me on this. > > Regards, > Rajesh > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra