What happens if you do not use the temp table, but directly do
df.saveAsTsble with mode append? If i have to guess without looking at the
code of your task function, i would think the name if temp table is
evaluated statically, so all threads are refering to same tsble. In other
words your app is not threadsafe

Best
Ayan
On Mon, 17 Apr 2017 at 11:45 pm, Amol Patil <amol4soc...@gmail.com> wrote:

> Thanks Ryan,
>
> Each dataset has separate hive table. All hive tables belongs to same hive
> database.
>
> The idea is to ingest data in parallel in respective hive tables.
>
> If I run code sequentially for each data source, it works fine but I will
> take lot of time. We are planning to process around 30-40 different data
> sources.
>
> Please advise.
>
> Thank you,
> Amol
>
>
>
> On Monday, April 17, 2017, Ryan <ryan.hd....@gmail.com> wrote:
>
>> I don't think you can parallel insert into a hive table without dynamic
>> partition, for hive locking please refer to
>> https://cwiki.apache.org/confluence/display/Hive/Locking.
>>
>> Other than that, it should work.
>>
>> On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil <amol4soc...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I'm writing generic pyspark program to process multiple datasets using
>>> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
>>> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
>>> will be available at different timeframe (weekly,monthly,quarterly).
>>>
>>> My requirement is to process all the datasets in parallel by triggering
>>> job only once.
>>>
>>> In Current implementation we are using Spark CSV package for reading csv
>>> files & using python treading mechanism to trigger multiple threads
>>> ----------------------
>>> jobs = []
>>> for dict_key, dict_val in config_dict.items():
>>> t = threading.Thread(target=task,args=(sqlContext,dict_val))
>>> jobs.append(t)
>>> t.start()
>>>
>>> for x in jobs:
>>> x.join()
>>> -----------------------
>>> And Defind task mehtod to process each dataset based config values dict
>>>
>>> -----------------------------------------
>>> def task(sqlContex, data_source_dict):
>>> ..
>>> ...
>>> -------------------------------------
>>>
>>> task method,
>>> 1. create dataframe from csv file
>>> 2. then create temporary table from that dataframe.
>>> 3. finally it ingest data in to Hive table.
>>>
>>> *Issue:*
>>> 1. If I process two datasets in parallel, one dataset goes through
>>> successfully but for other dataset I'm getting error "*u'temp_table not
>>> found*" while ingesting data in to hive table. Its happening
>>> consistently either with dataset A or Dataset B
>>> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
>>> '+temp_table_name)
>>>
>>> I tried below things
>>> 1. I'm creating dataframe name & temporary tabel name dynamically based
>>> in dataset name.
>>> 2. Enabled Spark Dynamic allocation (--conf
>>> spark.dynamicAllocation.enabled=true)
>>> 3. Set spark.scheduler.mode to FAIR
>>>
>>>
>>> I appreciate advise on
>>> 1. Is anything wrong in above implementation?
>>> 2. Is it good idea to process those big datasets in parallel in one job?
>>> 3. Any other solution to process multiple datasets in parallel?
>>>
>>> Thank you,
>>> Amol Patil
>>>
>>
>> --
Best Regards,
Ayan Guha

Reply via email to