Hi

In this case, if you see, t[1] is NOT the file content, as I have added a
"FileType" field. So, this collect is just bringing in the list of file
types, should be fine

On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <arunp.bigd...@gmail.com> wrote:

> Thanks Ayan.  I am really concerned about the collect.
>
> types = rdd1.map(lambda t: t[1]).distinct().collect()
>
> This will ship all files on to the driver, right?  It must be inefficient.
>
>
> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi
>>
>> I think you are correct direction. What is missing is: You do not need to
>> create DF for each file. You can scramble files with similar structures
>> together (by doing some filter on file name) and then create a DF for each
>> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
>> would probably do it like this
>>
>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>> (t[0],generateFileType(t[0]),t[1])
>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>
>> DFList = []
>>
>> for k in types:
>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>      DFList.append(df)
>>
>>
>>
>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigd...@gmail.com>
>> wrote:
>>
>>> My Pyspark program is currently identifies the list of the files from a
>>> directory (Using Python Popen command taking hadoop fs -ls arguments).  For
>>> each file, a Dataframe is created and processed. This is sequeatial. How to
>>> process all files paralelly?  Please note that every file in the directory
>>> has different schema.  So, depending on the file name, different logic is
>>> used for each file. So, I cannot really create one Dataframe for all these
>>> files and iterate each row.  Using wholeTextFiles seems to be good approach
>>> for me.  But, I am not sure how to create DataFrame from this.  For
>>> example, Is there a way to do this way do something like below.
>>>
>>> def createDFProcess(myrdd):
>>>     df = sqlCtx.read.json(myrdd)
>>>     df.show()
>>>
>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>
>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>> object is not callable'.  How to make it work?
>>>
>>> Or is there a better approach?
>>>
>>> -Arun
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to