Thanks Ryan,

Each dataset has separate hive table. All hive tables belongs to same hive
database.

The idea is to ingest data in parallel in respective hive tables.

If I run code sequentially for each data source, it works fine but I will
take lot of time. We are planning to process around 30-40 different data
sources.

Please advise.

Thank you,
Amol



On Monday, April 17, 2017, Ryan <ryan.hd....@gmail.com> wrote:

> I don't think you can parallel insert into a hive table without dynamic
> partition, for hive locking please refer to https://cwiki.apache.org/
> confluence/display/Hive/Locking.
>
> Other than that, it should work.
>
> On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil <amol4soc...@gmail.com
> <javascript:_e(%7B%7D,'cvml','amol4soc...@gmail.com');>> wrote:
>
>> Hi All,
>>
>> I'm writing generic pyspark program to process multiple datasets using
>> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
>> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
>> will be available at different timeframe (weekly,monthly,quarterly).
>>
>> My requirement is to process all the datasets in parallel by triggering
>> job only once.
>>
>> In Current implementation we are using Spark CSV package for reading csv
>> files & using python treading mechanism to trigger multiple threads
>> ----------------------
>> jobs = []
>> for dict_key, dict_val in config_dict.items():
>> t = threading.Thread(target=task,args=(sqlContext,dict_val))
>> jobs.append(t)
>> t.start()
>>
>> for x in jobs:
>> x.join()
>> -----------------------
>> And Defind task mehtod to process each dataset based config values dict
>>
>> -----------------------------------------
>> def task(sqlContex, data_source_dict):
>> ..
>> ...
>> -------------------------------------
>>
>> task method,
>> 1. create dataframe from csv file
>> 2. then create temporary table from that dataframe.
>> 3. finally it ingest data in to Hive table.
>>
>> *Issue:*
>> 1. If I process two datasets in parallel, one dataset goes through
>> successfully but for other dataset I'm getting error "*u'temp_table not
>> found*" while ingesting data in to hive table. Its happening
>> consistently either with dataset A or Dataset B
>> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
>> '+temp_table_name)
>>
>> I tried below things
>> 1. I'm creating dataframe name & temporary tabel name dynamically based
>> in dataset name.
>> 2. Enabled Spark Dynamic allocation (--conf spark.dynamicAllocation.enable
>> d=true)
>> 3. Set spark.scheduler.mode to FAIR
>>
>>
>> I appreciate advise on
>> 1. Is anything wrong in above implementation?
>> 2. Is it good idea to process those big datasets in parallel in one job?
>> 3. Any other solution to process multiple datasets in parallel?
>>
>> Thank you,
>> Amol Patil
>>
>
>

Reply via email to