@Ayan - Creating temp table dynamically based on dataset name. I will
explore df.saveAsTable option.


On Mon, Apr 17, 2017 at 9:53 PM, Ryan <ryan.hd....@gmail.com> wrote:

> It shouldn't be a problem then. We've done the similar thing in scala. I
> don't have much experience with python thread but maybe the code related
> with reading/writing temp table isn't thread safe.
>
> On Mon, Apr 17, 2017 at 9:45 PM, Amol Patil <amol4soc...@gmail.com> wrote:
>
>> Thanks Ryan,
>>
>> Each dataset has separate hive table. All hive tables belongs to same
>> hive database.
>>
>> The idea is to ingest data in parallel in respective hive tables.
>>
>> If I run code sequentially for each data source, it works fine but I will
>> take lot of time. We are planning to process around 30-40 different data
>> sources.
>>
>> Please advise.
>>
>> Thank you,
>> Amol
>>
>>
>>
>> On Monday, April 17, 2017, Ryan <ryan.hd....@gmail.com> wrote:
>>
>>> I don't think you can parallel insert into a hive table without dynamic
>>> partition, for hive locking please refer to
>>> https://cwiki.apache.org/confluence/display/Hive/Locking.
>>>
>>> Other than that, it should work.
>>>
>>> On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil <amol4soc...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I'm writing generic pyspark program to process multiple datasets using
>>>> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
>>>> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
>>>> will be available at different timeframe (weekly,monthly,quarterly).
>>>>
>>>> My requirement is to process all the datasets in parallel by triggering
>>>> job only once.
>>>>
>>>> In Current implementation we are using Spark CSV package for reading
>>>> csv files & using python treading mechanism to trigger multiple threads
>>>> ----------------------
>>>> jobs = []
>>>> for dict_key, dict_val in config_dict.items():
>>>> t = threading.Thread(target=task,args=(sqlContext,dict_val))
>>>> jobs.append(t)
>>>> t.start()
>>>>
>>>> for x in jobs:
>>>> x.join()
>>>> -----------------------
>>>> And Defind task mehtod to process each dataset based config values dict
>>>>
>>>> -----------------------------------------
>>>> def task(sqlContex, data_source_dict):
>>>> ..
>>>> ...
>>>> -------------------------------------
>>>>
>>>> task method,
>>>> 1. create dataframe from csv file
>>>> 2. then create temporary table from that dataframe.
>>>> 3. finally it ingest data in to Hive table.
>>>>
>>>> *Issue:*
>>>> 1. If I process two datasets in parallel, one dataset goes through
>>>> successfully but for other dataset I'm getting error "*u'temp_table
>>>> not found*" while ingesting data in to hive table. Its happening
>>>> consistently either with dataset A or Dataset B
>>>> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
>>>> '+temp_table_name)
>>>>
>>>> I tried below things
>>>> 1. I'm creating dataframe name & temporary tabel name dynamically based
>>>> in dataset name.
>>>> 2. Enabled Spark Dynamic allocation (--conf
>>>> spark.dynamicAllocation.enabled=true)
>>>> 3. Set spark.scheduler.mode to FAIR
>>>>
>>>>
>>>> I appreciate advise on
>>>> 1. Is anything wrong in above implementation?
>>>> 2. Is it good idea to process those big datasets in parallel in one job?
>>>> 3. Any other solution to process multiple datasets in parallel?
>>>>
>>>> Thank you,
>>>> Amol Patil
>>>>
>>>
>>>
>

Reply via email to