@Ayan - Creating temp table dynamically based on dataset name. I will explore df.saveAsTable option.
On Mon, Apr 17, 2017 at 9:53 PM, Ryan <ryan.hd....@gmail.com> wrote: > It shouldn't be a problem then. We've done the similar thing in scala. I > don't have much experience with python thread but maybe the code related > with reading/writing temp table isn't thread safe. > > On Mon, Apr 17, 2017 at 9:45 PM, Amol Patil <amol4soc...@gmail.com> wrote: > >> Thanks Ryan, >> >> Each dataset has separate hive table. All hive tables belongs to same >> hive database. >> >> The idea is to ingest data in parallel in respective hive tables. >> >> If I run code sequentially for each data source, it works fine but I will >> take lot of time. We are planning to process around 30-40 different data >> sources. >> >> Please advise. >> >> Thank you, >> Amol >> >> >> >> On Monday, April 17, 2017, Ryan <ryan.hd....@gmail.com> wrote: >> >>> I don't think you can parallel insert into a hive table without dynamic >>> partition, for hive locking please refer to >>> https://cwiki.apache.org/confluence/display/Hive/Locking. >>> >>> Other than that, it should work. >>> >>> On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil <amol4soc...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> >>>> I'm writing generic pyspark program to process multiple datasets using >>>> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will >>>> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset >>>> will be available at different timeframe (weekly,monthly,quarterly). >>>> >>>> My requirement is to process all the datasets in parallel by triggering >>>> job only once. >>>> >>>> In Current implementation we are using Spark CSV package for reading >>>> csv files & using python treading mechanism to trigger multiple threads >>>> ---------------------- >>>> jobs = [] >>>> for dict_key, dict_val in config_dict.items(): >>>> t = threading.Thread(target=task,args=(sqlContext,dict_val)) >>>> jobs.append(t) >>>> t.start() >>>> >>>> for x in jobs: >>>> x.join() >>>> ----------------------- >>>> And Defind task mehtod to process each dataset based config values dict >>>> >>>> ----------------------------------------- >>>> def task(sqlContex, data_source_dict): >>>> .. >>>> ... >>>> ------------------------------------- >>>> >>>> task method, >>>> 1. create dataframe from csv file >>>> 2. then create temporary table from that dataframe. >>>> 3. finally it ingest data in to Hive table. >>>> >>>> *Issue:* >>>> 1. If I process two datasets in parallel, one dataset goes through >>>> successfully but for other dataset I'm getting error "*u'temp_table >>>> not found*" while ingesting data in to hive table. Its happening >>>> consistently either with dataset A or Dataset B >>>> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from >>>> '+temp_table_name) >>>> >>>> I tried below things >>>> 1. I'm creating dataframe name & temporary tabel name dynamically based >>>> in dataset name. >>>> 2. Enabled Spark Dynamic allocation (--conf >>>> spark.dynamicAllocation.enabled=true) >>>> 3. Set spark.scheduler.mode to FAIR >>>> >>>> >>>> I appreciate advise on >>>> 1. Is anything wrong in above implementation? >>>> 2. Is it good idea to process those big datasets in parallel in one job? >>>> 3. Any other solution to process multiple datasets in parallel? >>>> >>>> Thank you, >>>> Amol Patil >>>> >>> >>> >