Re: Spark - “min key = null, max key = null” while reading ORC file

2016-06-20 Thread Mohanraj Ragupathiraj
Thank you very much.

On Mon, Jun 20, 2016 at 3:38 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> If you insert the data sorted then there is not need to bucket the data.
> You can even create an index in Spark. Simply set the outputformat
> configuration orc.create.index = true
>
>
> On 20 Jun 2016, at 09:10, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
> Right, you concern is that you expect storeindex in ORC file to help the
> optimizer.
>
> Frankly I do not know what
> write().mode(SaveMode.Overwrite).orc("orcFileToRead" does actually under
> the bonnet. From my experience in order for ORC index to be used you need
> to bucket the table. I have explained these before in here
> <https://www.linkedin.com/pulse/apache-hive-data-warehouse-proposal-improve-external-mich?trk=pulse_spock-articles>
>
> Now it is possible that you have not updated statistics on the table
>
> Even with Spark I tend to create my ORC table explicitly through Spark SQL.
>
> You stated the join scans all the underlying ORC table. Your "id" column I
> assume is unique. So I would bucket it using id column.
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 20 June 2016 at 07:07, Mohanraj Ragupathiraj <mohanaug...@gmail.com>
> wrote:
>
>> Hi Mich,
>>
>> Thank you for your reply.
>>
>> Let me explain more clearly.
>>
>> File with 100 records needs to joined with a Big lookup File created in
>> ORC format (500 million records). The Spark process i wrote is returing
>> back the matching records and is working fine. My concern is that it loads
>> the entire file (500 million) and matches with the 100 records instead of
>> loading only the stripes with matching keys. I read that ORC file provides
>> indexes (https://orc.apache.org/docs/indexes.html) and i assumned that
>> when i join using Dataframes, the indexes will be used, resulting in
>> loading of only matching records/stripes for processing instead of the
>> whole table.
>>
>> On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> To start when you store the data in ORC file can you verify that the
>>> data is there?
>>>
>>> For example register it as tempTable
>>>
>>> processDF.register("tmp")
>>> sql("select count(1) from tmp).show
>>>
>>> Also what do you mean by index file in ORC?
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 20 June 2016 at 05:01, Mohanraj Ragupathiraj <mohanaug...@gmail.com>
>>> wrote:
>>>
>>>> I am trying to join a Dataframe(say 100 records) with an ORC file with
>>>> 500 million records through Spark(can increase to 4-5 billion, 25 bytes
>>>> each record).
>>>>
>>>> I used Spark hiveContext API.
>>>>
>>>> *ORC File Creation Code*
>>>>
>>>> //fsdtRdd is JavaRDD, fsdtSchema is StructType schema
>>>> DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
>>>> fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");
>>>>
>>>> *ORC File Reading Code*
>>>>
>>>> HiveContext hiveContext = new HiveContext(sparkContext);
>>>> DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
>>>> // allRecords is dataframe
>>>> DataFrame processDf = 
>>>> allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
>>>> processDf.show();
>>>>
>>>> When I read the ORC file, the get following in my Spark Logs:
>>>>
>>>> Input split: 
>>>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc

Re: Spark - “min key = null, max key = null” while reading ORC file

2016-06-20 Thread Mohanraj Ragupathiraj
Thank you very much.

On Mon, Jun 20, 2016 at 3:10 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Right, you concern is that you expect storeindex in ORC file to help the
> optimizer.
>
> Frankly I do not know what
> write().mode(SaveMode.Overwrite).orc("orcFileToRead" does actually under
> the bonnet. From my experience in order for ORC index to be used you need
> to bucket the table. I have explained these before in here
> <https://www.linkedin.com/pulse/apache-hive-data-warehouse-proposal-improve-external-mich?trk=pulse_spock-articles>
>
> Now it is possible that you have not updated statistics on the table
>
> Even with Spark I tend to create my ORC table explicitly through Spark SQL.
>
> You stated the join scans all the underlying ORC table. Your "id" column I
> assume is unique. So I would bucket it using id column.
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 20 June 2016 at 07:07, Mohanraj Ragupathiraj <mohanaug...@gmail.com>
> wrote:
>
>> Hi Mich,
>>
>> Thank you for your reply.
>>
>> Let me explain more clearly.
>>
>> File with 100 records needs to joined with a Big lookup File created in
>> ORC format (500 million records). The Spark process i wrote is returing
>> back the matching records and is working fine. My concern is that it loads
>> the entire file (500 million) and matches with the 100 records instead of
>> loading only the stripes with matching keys. I read that ORC file provides
>> indexes (https://orc.apache.org/docs/indexes.html) and i assumned that
>> when i join using Dataframes, the indexes will be used, resulting in
>> loading of only matching records/stripes for processing instead of the
>> whole table.
>>
>> On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> To start when you store the data in ORC file can you verify that the
>>> data is there?
>>>
>>> For example register it as tempTable
>>>
>>> processDF.register("tmp")
>>> sql("select count(1) from tmp).show
>>>
>>> Also what do you mean by index file in ORC?
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 20 June 2016 at 05:01, Mohanraj Ragupathiraj <mohanaug...@gmail.com>
>>> wrote:
>>>
>>>> I am trying to join a Dataframe(say 100 records) with an ORC file with
>>>> 500 million records through Spark(can increase to 4-5 billion, 25 bytes
>>>> each record).
>>>>
>>>> I used Spark hiveContext API.
>>>>
>>>> *ORC File Creation Code*
>>>>
>>>> //fsdtRdd is JavaRDD, fsdtSchema is StructType schema
>>>> DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
>>>> fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");
>>>>
>>>> *ORC File Reading Code*
>>>>
>>>> HiveContext hiveContext = new HiveContext(sparkContext);
>>>> DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
>>>> // allRecords is dataframe
>>>> DataFrame processDf = 
>>>> allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
>>>> processDf.show();
>>>>
>>>> When I read the ORC file, the get following in my Spark Logs:
>>>>
>>>> Input split: 
>>>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min
>>>>  key = null, max key = null*
>>>> Reading ORC rows from 
>>>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc
>>>>  with {include: [true, true, true], offset: 0, length: 9223372036854775807}
>>>> Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver
>>>> Starting task 56.0 in stage 2.0 (TID 60, localhost, partition 
>>>> 56,PROCESS_LOCAL, 2220 bytes)
>>>> Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84)
>>>> Running task 56.0 in stage 2.0 (TID 60)
>>>>
>>>> Although the Spark job completes successfully, I think, its not able to
>>>> utilize ORC index file capability and thus checks through entire block of
>>>> ORC data before moving on.
>>>>
>>>> *Question*
>>>>
>>>> -- Is it a normal behaviour, or I have to set any configuration before
>>>> saving the data in ORC format?
>>>>
>>>> -- If it is *NORMAL*, what is the best way to join so that we discrad
>>>> non-matching records on the disk level(maybe only the index file for ORC
>>>> data is loaded)?
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks and Regards
>> Mohan
>> VISA Pte Limited, Singapore.
>>
>
>


-- 
Thanks and Regards
Mohan
VISA Pte Limited, Singapore.


Re: Spark - “min key = null, max key = null” while reading ORC file

2016-06-20 Thread Mohanraj Ragupathiraj
Hi Mich,

Thank you for your reply.

Let me explain more clearly.

File with 100 records needs to joined with a Big lookup File created in ORC
format (500 million records). The Spark process i wrote is returing back
the matching records and is working fine. My concern is that it loads the
entire file (500 million) and matches with the 100 records instead of
loading only the stripes with matching keys. I read that ORC file provides
indexes (https://orc.apache.org/docs/indexes.html) and i assumned that when
i join using Dataframes, the indexes will be used, resulting in loading of
only matching records/stripes for processing instead of the whole table.

On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> To start when you store the data in ORC file can you verify that the data
> is there?
>
> For example register it as tempTable
>
> processDF.register("tmp")
> sql("select count(1) from tmp).show
>
> Also what do you mean by index file in ORC?
>
> HTH
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 20 June 2016 at 05:01, Mohanraj Ragupathiraj <mohanaug...@gmail.com>
> wrote:
>
>> I am trying to join a Dataframe(say 100 records) with an ORC file with
>> 500 million records through Spark(can increase to 4-5 billion, 25 bytes
>> each record).
>>
>> I used Spark hiveContext API.
>>
>> *ORC File Creation Code*
>>
>> //fsdtRdd is JavaRDD, fsdtSchema is StructType schema
>> DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
>> fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");
>>
>> *ORC File Reading Code*
>>
>> HiveContext hiveContext = new HiveContext(sparkContext);
>> DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
>> // allRecords is dataframe
>> DataFrame processDf = 
>> allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
>> processDf.show();
>>
>> When I read the ORC file, the get following in my Spark Logs:
>>
>> Input split: 
>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min
>>  key = null, max key = null*
>> Reading ORC rows from 
>> file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc
>>  with {include: [true, true, true], offset: 0, length: 9223372036854775807}
>> Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver
>> Starting task 56.0 in stage 2.0 (TID 60, localhost, partition 
>> 56,PROCESS_LOCAL, 2220 bytes)
>> Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84)
>> Running task 56.0 in stage 2.0 (TID 60)
>>
>> Although the Spark job completes successfully, I think, its not able to
>> utilize ORC index file capability and thus checks through entire block of
>> ORC data before moving on.
>>
>> *Question*
>>
>> -- Is it a normal behaviour, or I have to set any configuration before
>> saving the data in ORC format?
>>
>> -- If it is *NORMAL*, what is the best way to join so that we discrad
>> non-matching records on the disk level(maybe only the index file for ORC
>> data is loaded)?
>>
>
>


-- 
Thanks and Regards
Mohan
VISA Pte Limited, Singapore.


Spark - “min key = null, max key = null” while reading ORC file

2016-06-19 Thread Mohanraj Ragupathiraj
I am trying to join a Dataframe(say 100 records) with an ORC file with 500
million records through Spark(can increase to 4-5 billion, 25 bytes each
record).

I used Spark hiveContext API.

*ORC File Creation Code*

//fsdtRdd is JavaRDD, fsdtSchema is StructType schema
DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");

*ORC File Reading Code*

HiveContext hiveContext = new HiveContext(sparkContext);
DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
// allRecords is dataframe
DataFrame processDf =
allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
processDf.show();

When I read the ORC file, the get following in my Spark Logs:

Input split: 
file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348*min
key = null, max key = null*
Reading ORC rows from
file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc
with {include: [true, true, true], offset: 0, length:
9223372036854775807}
Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver
Starting task 56.0 in stage 2.0 (TID 60, localhost, partition
56,PROCESS_LOCAL, 2220 bytes)
Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84)
Running task 56.0 in stage 2.0 (TID 60)

Although the Spark job completes successfully, I think, its not able to
utilize ORC index file capability and thus checks through entire block of
ORC data before moving on.

*Question*

-- Is it a normal behaviour, or I have to set any configuration before
saving the data in ORC format?

-- If it is *NORMAL*, what is the best way to join so that we discrad
non-matching records on the disk level(maybe only the index file for ORC
data is loaded)?


SPARK - DataFrame for BulkLoad

2016-05-17 Thread Mohanraj Ragupathiraj
I have 100 million records to be inserted to a HBase table (PHOENIX) as a
result of a Spark Job. I would like to know if i convert it to a Dataframe
and save it, will it do Bulk load (or) it is not the efficient way to write
data to a HBase table

-- 
Thanks and Regards
Mohan


Load Table as DataFrame

2016-05-17 Thread Mohanraj Ragupathiraj
I have created a DataFrame from a HBase Table (PHOENIX) which has 500
million rows. From the DataFrame I created an RDD of JavaBean and use it
for joining with data from a file.

Map phoenixInfoMap = new HashMap();
phoenixInfoMap.put("table", tableName);
phoenixInfoMap.put("zkUrl", zkURL);
DataFrame df =
sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load();
JavaRDD tableRows = df.toJavaRDD();
JavaPairRDD dbData = tableRows.mapToPair(
new PairFunction()
{
@Override
public Tuple2 call(Row row) throws Exception
{
return new Tuple2(row.getAs("ID"), row.getAs("NAME"));
}
});

Now my question - Lets say the file has 2 unique million entries matching
with the table. Is the entire table loaded into memory as RDD or only the
matching 2 million records from the table will be loaded into memory as RDD
?


http://stackoverflow.com/questions/37289849/phoenix-spark-load-table-as-dataframe

-- 
Thanks and Regards
Mohan


Load Table as DataFrame

2016-05-17 Thread Mohanraj Ragupathiraj
I have created a DataFrame from a HBase Table (PHOENIX) which has 500
million rows. From the DataFrame I created an RDD of JavaBean and use it
for joining with data from a file.

Map phoenixInfoMap = new HashMap();
phoenixInfoMap.put("table", tableName);
phoenixInfoMap.put("zkUrl", zkURL);
DataFrame df =
sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load();
JavaRDD tableRows = df.toJavaRDD();
JavaPairRDD dbData = tableRows.mapToPair(
new PairFunction()
{
@Override
public Tuple2 call(Row row) throws Exception
{
return new Tuple2(row.getAs("ID"), row.getAs("NAME"));
}
});

Now my question - Lets say the file has 2 unique million entries matching
with the table. Is the entire table loaded into memory as RDD or only the
matching 2 million records from the table will be loaded into memory as RDD
?


http://stackoverflow.com/questions/37289849/phoenix-spark-load-table-as-dataframe

-- 
Thanks and Regards
Mohan