Re: How to use disk instead of just InMemoryRelation when use JDBC datasource in SPARKSQL?

2018-04-12 Thread Takeshi Yamamuro
You want to use `Dataset.persist(StorageLevel.MEMORY_AND_DISK)`?

On Thu, Apr 12, 2018 at 1:12 PM, Louis Hust  wrote:

> We  want to extract data from mysql, and calculate in sparksql.
> The sql explain like below.
>
>
> REGIONKEY#177,N_COMMENT#178] PushedFilters: [], ReadSchema:
> struct
>   +- *(20) Sort [r_regionkey#203 ASC NULLS FIRST], false, 0
>  +- Exchange(coordinator id: 266374831)
> hashpartitioning(r_regionkey#203, 200), coordinator[target post-shuffle
> partition size: 67108864]
> +- *(19) Project [R_REGIONKEY#203]
>+- *(19) Filter ((isnotnull(r_name#204) &&
> (r_name#204 = AFRICA)) && isnotnull(r_regionkey#203))
>   +- InMemoryTableScan [R_REGIONKEY#203,
> r_name#204], [isnotnull(r_name#204), (r_name#204 = AFRICA),
> isnotnull(r_regionkey#203)]
> +- InMemoryRelation [R_REGIONKEY#203,
> R_NAME#204, R_COMMENT#205], true, 1, StorageLevel(disk, memory, 1
> replicas)
>   +- *(1) Scan
> JDBCRelation(region) [numPartitions=1] 
> [R_REGIONKEY#203,R_NAME#204,R_COMMENT#205]
> PushedFilters: [], ReadSchema: struct string,R_COMMENT:string>
>
>
> As you see, all JDBCRelation convert to InMemoryRelation. Cause the JDBC
> table is so big, the all data can not be filled into memory,  OOM occurs.
> If there is some option to make SparkSQL use Disk if memory not enough?
>



-- 
---
Takeshi Yamamuro


How to use disk instead of just InMemoryRelation when use JDBC datasource in SPARKSQL?

2018-04-11 Thread Louis Hust
We  want to extract data from mysql, and calculate in sparksql.
The sql explain like below.


REGIONKEY#177,N_COMMENT#178] PushedFilters: [], ReadSchema:
struct
  +- *(20) Sort [r_regionkey#203 ASC NULLS FIRST], false, 0
 +- Exchange(coordinator id: 266374831)
hashpartitioning(r_regionkey#203, 200), coordinator[target post-shuffle
partition size: 67108864]
+- *(19) Project [R_REGIONKEY#203]
   +- *(19) Filter ((isnotnull(r_name#204) &&
(r_name#204 = AFRICA)) && isnotnull(r_regionkey#203))
  +- InMemoryTableScan [R_REGIONKEY#203,
r_name#204], [isnotnull(r_name#204), (r_name#204 = AFRICA),
isnotnull(r_regionkey#203)]
+- InMemoryRelation [R_REGIONKEY#203,
R_NAME#204, R_COMMENT#205], true, 1, StorageLevel(disk, memory, 1
replicas)
  +- *(1) Scan JDBCRelation(region)
[numPartitions=1] [R_REGIONKEY#203,R_NAME#204,R_COMMENT#205] PushedFilters:
[], ReadSchema: struct


As you see, all JDBCRelation convert to InMemoryRelation. Cause the JDBC
table is so big, the all data can not be filled into memory,  OOM occurs.
If there is some option to make SparkSQL use Disk if memory not enough?


How to use disk instead of just InMemoryRelation when use JDBC datasource in SPARKSQL?

2018-04-10 Thread Louis Hust
We  want to extract data from mysql, and calculate in sparksql.
The sql explain like below.

== Parsed Logical Plan ==
> 'Sort ['revenue DESC NULLS LAST], true
> +- 'Aggregate ['n_name], ['n_name, 'SUM(('l_extendedprice * (1 -
> 'l_discount))) AS revenue#329]
>+- 'Filter ('c_custkey = 'o_custkey) && ('l_orderkey =
> 'o_orderkey)) && ('l_suppkey = 's_suppkey)) && (('c_nationkey =
> 's_nationkey) && ('s_nationkey = 'n_nationkey))) && ((('n_regionkey =
> 'r_regionkey) && ('r_name = AFRICA)) && (('o_orderdate >= 1993-01-01) &&
> ('o_orderdate < 1994-01-01
>   +- 'Join Inner
>  :- 'Join Inner
>  :  :- 'Join Inner
>  :  :  :- 'Join Inner
>  :  :  :  :- 'Join Inner
>  :  :  :  :  :- 'UnresolvedRelation `customer`
>  :  :  :  :  +- 'UnresolvedRelation `orders`
>  :  :  :  +- 'UnresolvedRelation `lineitem`
>  :  :  +- 'UnresolvedRelation `supplier`
>  :  +- 'UnresolvedRelation `nation`
>  +- 'UnresolvedRelation `region`
> == Analyzed Logical Plan ==
> n_name: string, revenue: decimal(38,4)
> Sort [revenue#329 DESC NULLS LAST], true
> +- Aggregate [n_name#176], [n_name#176,
> sum(CheckOverflow((promote_precision(cast(l_extendedprice#68 as
> decimal(16,2))) *
> promote_precision(cast(CheckOverflow((promote_precision(cast(cast(1 as
> decimal(1,0)) as decimal(16,2))) - promote_precision(cast(l_discount#69 as
> decimal(16,2, DecimalType(16,2)) as decimal(16,2,
> DecimalType(32,4))) AS revenue#329]
>+- Filter (c_custkey#273 = o_custkey#1) && (l_orderkey#63 =
> o_orderkey#0)) && (l_suppkey#65 = s_suppkey#224)) && ((c_nationkey#276 =
> s_nationkey#227) && (s_nationkey#227 = n_nationkey#175))) &&
> (((n_regionkey#177 = r_regionkey#203) && (r_name#204 = AFRICA)) &&
> ((cast(o_orderdate#4 as string) >= 1993-01-01) && (cast(o_orderdate#4 as
> string) < 1994-01-01
>   +- Join Inner
>  :- Join Inner
>  :  :- Join Inner
>  :  :  :- Join Inner
>  :  :  :  :- Join Inner
>  :  :  :  :  :- SubqueryAlias customer
>  :  :  :  :  :  +-
> Relation[C_CUSTKEY#273,C_NAME#274,C_ADDRESS#275,C_NATIONKEY#276,C_PHONE#277,C_ACCTBAL#278,C_MKTSEGMENT#279,C_COMMENT#280]
> JDBCRelation(customer) [numPartitions=1]
>  :  :  :  :  +- SubqueryAlias orders
>  :  :  :  : +-
> Relation[O_ORDERKEY#0,O_CUSTKEY#1,O_ORDERSTATUS#2,O_TOTALPRICE#3,O_ORDERDATE#4,O_ORDERPRIORITY#5,O_CLERK#6,O_SHIPPRIORITY#7,O_COMMENT#8]
> JDBCRelation(orders) [numPartitions=1]
>  :  :  :  +- SubqueryAlias lineitem
>  :  :  : +-
> Relation[L_ORDERKEY#63,L_PARTKEY#64,L_SUPPKEY#65,L_LINENUMBER#66,L_QUANTITY#67,L_EXTENDEDPRICE#68,L_DISCOUNT#69,L_TAX#70,L_RETURNFLAG#71,L_LINESTATUS#72,L_SHIPDATE#73,L_COMMITDATE#74,L_RECEIPTDATE#75,L_SHIPINSTRUCT#76,L_SHIPMODE#77,L_COMMENT#78]
> JDBCRelation(lineitem) [numPartitions=1]
>  :  :  +- SubqueryAlias supplier
>  :  : +-
> Relation[S_SUPPKEY#224,S_NAME#225,S_ADDRESS#226,S_NATIONKEY#227,S_PHONE#228,S_ACCTBAL#229,S_COMMENT#230]
> JDBCRelation(supplier) [numPartitions=1]
>  :  +- SubqueryAlias nation
>  : +-
> Relation[N_NATIONKEY#175,N_NAME#176,N_REGIONKEY#177,N_COMMENT#178]
> JDBCRelation(nation) [numPartitions=1]
>  +- SubqueryAlias region
> +- Relation[R_REGIONKEY#203,R_NAME#204,R_COMMENT#205]
> JDBCRelation(region) [numPartitions=1]
> == Optimized Logical Plan ==
> Sort [revenue#329 DESC NULLS LAST], true
> +- Aggregate [n_name#176], [n_name#176,
> sum(CheckOverflow((promote_precision(cast(l_extendedprice#68 as
> decimal(16,2))) * promote_precision(CheckOverflow((1.00 -
> promote_precision(cast(l_discount#69 as decimal(16,2,
> DecimalType(16,2, DecimalType(32,4))) AS revenue#329]
>+- Project [L_EXTENDEDPRICE#68, L_DISCOUNT#69, N_NAME#176]
>   +- Join Inner, (n_regionkey#177 = r_regionkey#203)
>  :- Project [L_EXTENDEDPRICE#68, L_DISCOUNT#69, N_NAME#176,
> N_REGIONKEY#177]
>  :  +- Join Inner, (s_nationkey#227 = n_nationkey#175)
>  : :- Project [L_EXTENDEDPRICE#68, L_DISCOUNT#69,
> S_NATIONKEY#227]
>  : :  +- Join Inner, ((l_suppkey#65 = s_suppkey#224) &&
> (c_nationkey#276 = s_nationkey#227))
>  : : :- Project [C_NATIONKEY#276, L_SUPPKEY#65,
> L_EXTENDEDPRICE#68, L_DISCOUNT#69]
>  : : :  +- Join Inner, (l_orderkey#63 = o_orderkey#0)
>  : : : :- Project [C_NATIONKEY#276, O_ORDERKEY#0]
>  : : : :  +- Join Inner, (c_custkey#273 = o_custkey#1)
>  : : : : :- Project [C_CUSTKEY#273,
> C_NATIONKEY#276]
>  : : : : :  +- Filter (isnotnull(c_custkey#273) &&
> isnotnull(c_nationkey#276))
>  : : : : : +- InMemoryRelation [C_CUSTKEY#273,
> C_NAME#274, C_ADDRESS#275, C_NATIONKEY#276, C_PHONE#277, C_ACCTBAL#278,
> C_MKTSEGMENT#279, C_COMMENT#280], true, 1, StorageLevel(disk, memory, 1
>