Re: Spark - “min key = null, max key = null” while reading ORC file
Thank you very much. On Mon, Jun 20, 2016 at 3:38 PM, Jörn Franke <jornfra...@gmail.com> wrote: > If you insert the data sorted then there is not need to bucket the data. > You can even create an index in Spark. Simply set the outputformat > configuration orc.create.index = true > > > On 20 Jun 2016, at 09:10, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > > Right, you concern is that you expect storeindex in ORC file to help the > optimizer. > > Frankly I do not know what > write().mode(SaveMode.Overwrite).orc("orcFileToRead" does actually under > the bonnet. From my experience in order for ORC index to be used you need > to bucket the table. I have explained these before in here > <https://www.linkedin.com/pulse/apache-hive-data-warehouse-proposal-improve-external-mich?trk=pulse_spock-articles> > > Now it is possible that you have not updated statistics on the table > > Even with Spark I tend to create my ORC table explicitly through Spark SQL. > > You stated the join scans all the underlying ORC table. Your "id" column I > assume is unique. So I would bucket it using id column. > > > HTH > > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 20 June 2016 at 07:07, Mohanraj Ragupathiraj <mohanaug...@gmail.com> > wrote: > >> Hi Mich, >> >> Thank you for your reply. >> >> Let me explain more clearly. >> >> File with 100 records needs to joined with a Big lookup File created in >> ORC format (500 million records). The Spark process i wrote is returing >> back the matching records and is working fine. My concern is that it loads >> the entire file (500 million) and matches with the 100 records instead of >> loading only the stripes with matching keys. I read that ORC file provides >> indexes (https://orc.apache.org/docs/indexes.html) and i assumned that >> when i join using Dataframes, the indexes will be used, resulting in >> loading of only matching records/stripes for processing instead of the >> whole table. >> >> On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Hi, >>> >>> To start when you store the data in ORC file can you verify that the >>> data is there? >>> >>> For example register it as tempTable >>> >>> processDF.register("tmp") >>> sql("select count(1) from tmp).show >>> >>> Also what do you mean by index file in ORC? >>> >>> HTH >>> >>> >>> >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 20 June 2016 at 05:01, Mohanraj Ragupathiraj <mohanaug...@gmail.com> >>> wrote: >>> >>>> I am trying to join a Dataframe(say 100 records) with an ORC file with >>>> 500 million records through Spark(can increase to 4-5 billion, 25 bytes >>>> each record). >>>> >>>> I used Spark hiveContext API. >>>> >>>> *ORC File Creation Code* >>>> >>>> //fsdtRdd is JavaRDD, fsdtSchema is StructType schema >>>> DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema); >>>> fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead"); >>>> >>>> *ORC File Reading Code* >>>> >>>> HiveContext hiveContext = new HiveContext(sparkContext); >>>> DataFrame orcFileData= hiveContext.read().orc("orcFileToRead"); >>>> // allRecords is dataframe >>>> DataFrame processDf = >>>> allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join"); >>>> processDf.show(); >>>> >>>> When I read the ORC file, the get following in my Spark Logs: >>>> >>>> Input split: >>>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc
Re: Spark - “min key = null, max key = null” while reading ORC file
Thank you very much. On Mon, Jun 20, 2016 at 3:10 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Right, you concern is that you expect storeindex in ORC file to help the > optimizer. > > Frankly I do not know what > write().mode(SaveMode.Overwrite).orc("orcFileToRead" does actually under > the bonnet. From my experience in order for ORC index to be used you need > to bucket the table. I have explained these before in here > <https://www.linkedin.com/pulse/apache-hive-data-warehouse-proposal-improve-external-mich?trk=pulse_spock-articles> > > Now it is possible that you have not updated statistics on the table > > Even with Spark I tend to create my ORC table explicitly through Spark SQL. > > You stated the join scans all the underlying ORC table. Your "id" column I > assume is unique. So I would bucket it using id column. > > > HTH > > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 20 June 2016 at 07:07, Mohanraj Ragupathiraj <mohanaug...@gmail.com> > wrote: > >> Hi Mich, >> >> Thank you for your reply. >> >> Let me explain more clearly. >> >> File with 100 records needs to joined with a Big lookup File created in >> ORC format (500 million records). The Spark process i wrote is returing >> back the matching records and is working fine. My concern is that it loads >> the entire file (500 million) and matches with the 100 records instead of >> loading only the stripes with matching keys. I read that ORC file provides >> indexes (https://orc.apache.org/docs/indexes.html) and i assumned that >> when i join using Dataframes, the indexes will be used, resulting in >> loading of only matching records/stripes for processing instead of the >> whole table. >> >> On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Hi, >>> >>> To start when you store the data in ORC file can you verify that the >>> data is there? >>> >>> For example register it as tempTable >>> >>> processDF.register("tmp") >>> sql("select count(1) from tmp).show >>> >>> Also what do you mean by index file in ORC? >>> >>> HTH >>> >>> >>> >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 20 June 2016 at 05:01, Mohanraj Ragupathiraj <mohanaug...@gmail.com> >>> wrote: >>> >>>> I am trying to join a Dataframe(say 100 records) with an ORC file with >>>> 500 million records through Spark(can increase to 4-5 billion, 25 bytes >>>> each record). >>>> >>>> I used Spark hiveContext API. >>>> >>>> *ORC File Creation Code* >>>> >>>> //fsdtRdd is JavaRDD, fsdtSchema is StructType schema >>>> DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema); >>>> fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead"); >>>> >>>> *ORC File Reading Code* >>>> >>>> HiveContext hiveContext = new HiveContext(sparkContext); >>>> DataFrame orcFileData= hiveContext.read().orc("orcFileToRead"); >>>> // allRecords is dataframe >>>> DataFrame processDf = >>>> allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join"); >>>> processDf.show(); >>>> >>>> When I read the ORC file, the get following in my Spark Logs: >>>> >>>> Input split: >>>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min >>>> key = null, max key = null* >>>> Reading ORC rows from >>>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc >>>> with {include: [true, true, true], offset: 0, length: 9223372036854775807} >>>> Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver >>>> Starting task 56.0 in stage 2.0 (TID 60, localhost, partition >>>> 56,PROCESS_LOCAL, 2220 bytes) >>>> Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84) >>>> Running task 56.0 in stage 2.0 (TID 60) >>>> >>>> Although the Spark job completes successfully, I think, its not able to >>>> utilize ORC index file capability and thus checks through entire block of >>>> ORC data before moving on. >>>> >>>> *Question* >>>> >>>> -- Is it a normal behaviour, or I have to set any configuration before >>>> saving the data in ORC format? >>>> >>>> -- If it is *NORMAL*, what is the best way to join so that we discrad >>>> non-matching records on the disk level(maybe only the index file for ORC >>>> data is loaded)? >>>> >>> >>> >> >> >> -- >> Thanks and Regards >> Mohan >> VISA Pte Limited, Singapore. >> > > -- Thanks and Regards Mohan VISA Pte Limited, Singapore.
Re: Spark - “min key = null, max key = null” while reading ORC file
Hi Mich, Thank you for your reply. Let me explain more clearly. File with 100 records needs to joined with a Big lookup File created in ORC format (500 million records). The Spark process i wrote is returing back the matching records and is working fine. My concern is that it loads the entire file (500 million) and matches with the 100 records instead of loading only the stripes with matching keys. I read that ORC file provides indexes (https://orc.apache.org/docs/indexes.html) and i assumned that when i join using Dataframes, the indexes will be used, resulting in loading of only matching records/stripes for processing instead of the whole table. On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > To start when you store the data in ORC file can you verify that the data > is there? > > For example register it as tempTable > > processDF.register("tmp") > sql("select count(1) from tmp).show > > Also what do you mean by index file in ORC? > > HTH > > > > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 20 June 2016 at 05:01, Mohanraj Ragupathiraj <mohanaug...@gmail.com> > wrote: > >> I am trying to join a Dataframe(say 100 records) with an ORC file with >> 500 million records through Spark(can increase to 4-5 billion, 25 bytes >> each record). >> >> I used Spark hiveContext API. >> >> *ORC File Creation Code* >> >> //fsdtRdd is JavaRDD, fsdtSchema is StructType schema >> DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema); >> fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead"); >> >> *ORC File Reading Code* >> >> HiveContext hiveContext = new HiveContext(sparkContext); >> DataFrame orcFileData= hiveContext.read().orc("orcFileToRead"); >> // allRecords is dataframe >> DataFrame processDf = >> allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join"); >> processDf.show(); >> >> When I read the ORC file, the get following in my Spark Logs: >> >> Input split: >> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min >> key = null, max key = null* >> Reading ORC rows from >> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc >> with {include: [true, true, true], offset: 0, length: 9223372036854775807} >> Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver >> Starting task 56.0 in stage 2.0 (TID 60, localhost, partition >> 56,PROCESS_LOCAL, 2220 bytes) >> Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84) >> Running task 56.0 in stage 2.0 (TID 60) >> >> Although the Spark job completes successfully, I think, its not able to >> utilize ORC index file capability and thus checks through entire block of >> ORC data before moving on. >> >> *Question* >> >> -- Is it a normal behaviour, or I have to set any configuration before >> saving the data in ORC format? >> >> -- If it is *NORMAL*, what is the best way to join so that we discrad >> non-matching records on the disk level(maybe only the index file for ORC >> data is loaded)? >> > > -- Thanks and Regards Mohan VISA Pte Limited, Singapore.
Spark - “min key = null, max key = null” while reading ORC file
I am trying to join a Dataframe(say 100 records) with an ORC file with 500 million records through Spark(can increase to 4-5 billion, 25 bytes each record). I used Spark hiveContext API. *ORC File Creation Code* //fsdtRdd is JavaRDD, fsdtSchema is StructType schema DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema); fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead"); *ORC File Reading Code* HiveContext hiveContext = new HiveContext(sparkContext); DataFrame orcFileData= hiveContext.read().orc("orcFileToRead"); // allRecords is dataframe DataFrame processDf = allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join"); processDf.show(); When I read the ORC file, the get following in my Spark Logs: Input split: file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min key = null, max key = null* Reading ORC rows from file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc with {include: [true, true, true], offset: 0, length: 9223372036854775807} Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver Starting task 56.0 in stage 2.0 (TID 60, localhost, partition 56,PROCESS_LOCAL, 2220 bytes) Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84) Running task 56.0 in stage 2.0 (TID 60) Although the Spark job completes successfully, I think, its not able to utilize ORC index file capability and thus checks through entire block of ORC data before moving on. *Question* -- Is it a normal behaviour, or I have to set any configuration before saving the data in ORC format? -- If it is *NORMAL*, what is the best way to join so that we discrad non-matching records on the disk level(maybe only the index file for ORC data is loaded)?
SPARK - DataFrame for BulkLoad
I have 100 million records to be inserted to a HBase table (PHOENIX) as a result of a Spark Job. I would like to know if i convert it to a Dataframe and save it, will it do Bulk load (or) it is not the efficient way to write data to a HBase table -- Thanks and Regards Mohan
Load Table as DataFrame
I have created a DataFrame from a HBase Table (PHOENIX) which has 500 million rows. From the DataFrame I created an RDD of JavaBean and use it for joining with data from a file. MapphoenixInfoMap = new HashMap (); phoenixInfoMap.put("table", tableName); phoenixInfoMap.put("zkUrl", zkURL); DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load(); JavaRDD tableRows = df.toJavaRDD(); JavaPairRDD dbData = tableRows.mapToPair( new PairFunction () { @Override public Tuple2
call(Row row) throws Exception { return new Tuple2 (row.getAs("ID"), row.getAs("NAME")); } }); Now my question - Lets say the file has 2 unique million entries matching with the table. Is the entire table loaded into memory as RDD or only the matching 2 million records from the table will be loaded into memory as RDD ? http://stackoverflow.com/questions/37289849/phoenix-spark-load-table-as-dataframe -- Thanks and Regards Mohan
Load Table as DataFrame
I have created a DataFrame from a HBase Table (PHOENIX) which has 500 million rows. From the DataFrame I created an RDD of JavaBean and use it for joining with data from a file. MapphoenixInfoMap = new HashMap (); phoenixInfoMap.put("table", tableName); phoenixInfoMap.put("zkUrl", zkURL); DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load(); JavaRDD tableRows = df.toJavaRDD(); JavaPairRDD dbData = tableRows.mapToPair( new PairFunction () { @Override public Tuple2
call(Row row) throws Exception { return new Tuple2 (row.getAs("ID"), row.getAs("NAME")); } }); Now my question - Lets say the file has 2 unique million entries matching with the table. Is the entire table loaded into memory as RDD or only the matching 2 million records from the table will be loaded into memory as RDD ? http://stackoverflow.com/questions/37289849/phoenix-spark-load-table-as-dataframe -- Thanks and Regards Mohan