-dev +user In general you cannot create new RDDs inside closures that run on the executors (which is what sql inside of a foreach is doing).
I think what you want here is something like: sqlContext.parquetFile("Data\\Test\\Parquet\\2").registerTempTable("temp2") sql("SELECT col1, col2 FROM temp2").insertInto("temp") On Mon, Dec 29, 2014 at 7:29 PM, evil <qinggangwa...@gmail.com> wrote: > Hi All, > I have a problem when I try to use insert into in loop, and this is my > code > def main(args: Array[String]) { > //This is an empty table, schema is (Int,String) > > > sqlContext.parquetFile("Data\\Test\\Parquet\\Temp").registerTempTable("temp") > //not empty table, schema is (Int,String) > val testData = sqlContext.parquetFile("Data\\Test\\Parquet\\2") > testData.foreach{x=> > sqlContext.sql("INSERT INTO temp SELECT "+x(0)+" ,'"+x(1)+"'") > } > sqlContext.sql("select * from > temp").collect().map(x=>(x(0),x(1))).foreach(println) > } > > when I run the code above in local mode, it will not stop and do not have > error log. The lastest log is as follows: > 14/12/30 11:07:44 WARN ParquetRecordReader: Can not initialize counter due > to context is not a instance of TaskInputOutputContext, but is > org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl > 14/12/30 11:07:44 INFO InternalParquetRecordReader: RecordReader > initialized > will read a total of 200 records. > 14/12/30 11:07:44 INFO InternalParquetRecordReader: at row 0. reading next > block > 14/12/30 11:07:44 INFO CodecPool: Got brand-new decompressor [.gz] > 14/12/30 11:07:44 INFO InternalParquetRecordReader: block read in memory in > 20 ms. row count = 200 > 14/12/30 11:07:45 INFO SparkContext: Starting job: runJob at > ParquetTableOperations.scala:325 > 14/12/30 11:07:45 INFO DAGScheduler: Got job 1 (runJob at > ParquetTableOperations.scala:325) with 1 output partitions > (allowLocal=false) > 14/12/30 11:07:45 INFO DAGScheduler: Final stage: Stage 1(runJob at > ParquetTableOperations.scala:325) > 14/12/30 11:07:45 INFO DAGScheduler: Parents of final stage: List() > 14/12/30 11:07:45 INFO DAGScheduler: Missing parents: List() > 14/12/30 11:07:45 INFO DAGScheduler: Submitting Stage 1 > (MapPartitionsRDD[6] > at mapPartitions at basicOperators.scala:43), which has no missing parents > 14/12/30 11:07:45 INFO MemoryStore: ensureFreeSpace(53328) called with > curMem=239241, maxMem=1013836677 > 14/12/30 11:07:45 INFO MemoryStore: Block broadcast_2 stored as values in > memory (estimated size 52.1 KB, free 966.6 MB) > 14/12/30 11:07:45 INFO MemoryStore: ensureFreeSpace(31730) called with > curMem=292569, maxMem=1013836677 > 14/12/30 11:07:45 INFO MemoryStore: Block broadcast_2_piece0 stored as > bytes > in memory (estimated size 31.0 KB, free 966.6 MB) > 14/12/30 11:07:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory > on localhost:52533 (size: 31.0 KB, free: 966.8 MB) > 14/12/30 11:07:45 INFO BlockManagerMaster: Updated info of block > broadcast_2_piece0 > 14/12/30 11:07:45 INFO SparkContext: Created broadcast 2 from broadcast at > DAGScheduler.scala:838 > 14/12/30 11:07:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage > 1 > (MapPartitionsRDD[6] at mapPartitions at basicOperators.scala:43) > 14/12/30 11:07:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks > > Can anyone give me a hand? > > Thanks > evil > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/A-question-about-using-insert-into-in-rdd-foreach-in-spark-1-2-tp9959.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >