Hi In this case, if you see, t[1] is NOT the file content, as I have added a "FileType" field. So, this collect is just bringing in the list of file types, should be fine
On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <arunp.bigd...@gmail.com> wrote: > Thanks Ayan. I am really concerned about the collect. > > types = rdd1.map(lambda t: t[1]).distinct().collect() > > This will ship all files on to the driver, right? It must be inefficient. > > > On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.a...@gmail.com> wrote: > >> Hi >> >> I think you are correct direction. What is missing is: You do not need to >> create DF for each file. You can scramble files with similar structures >> together (by doing some filter on file name) and then create a DF for each >> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I >> would probably do it like this >> >> rdd1 = sc.wholeTextFile(inputpath).map(lambda t: >> (t[0],generateFileType(t[0]),t[1]) >> types = rdd1.map(lambda t: t[1]).distinct().collect() >> >> DFList = [] >> >> for k in types: >> df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k)) >> DFList.append(df) >> >> >> >> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigd...@gmail.com> >> wrote: >> >>> My Pyspark program is currently identifies the list of the files from a >>> directory (Using Python Popen command taking hadoop fs -ls arguments). For >>> each file, a Dataframe is created and processed. This is sequeatial. How to >>> process all files paralelly? Please note that every file in the directory >>> has different schema. So, depending on the file name, different logic is >>> used for each file. So, I cannot really create one Dataframe for all these >>> files and iterate each row. Using wholeTextFiles seems to be good approach >>> for me. But, I am not sure how to create DataFrame from this. For >>> example, Is there a way to do this way do something like below. >>> >>> def createDFProcess(myrdd): >>> df = sqlCtx.read.json(myrdd) >>> df.show() >>> >>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent']) >>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess) >>> >>> Above code does not work. I get an error 'TypeError: 'JavaPackage' >>> object is not callable'. How to make it work? >>> >>> Or is there a better approach? >>> >>> -Arun >>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha