[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-26 Thread Will Dumaresq (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221468#comment-16221468
 ] 

Will Dumaresq commented on PHOENIX-3999:


{quote}Just to confirm though, would the first query that selects from 
COMPLETED_BATCHES also timeout if not executed by ranges?{quote}

Probably not if executed by itself, but if we attempt to process the batches 
returned from this query while stepping through the result set, then I think 
there was some risk of that leading to a timeout (because of waiting too long 
between calls to next()).

So that's why we are doing that one in chunks as well, where the result set is 
consumed and closed and the results are held in memory while processing the 
batches.



> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
>   

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-23 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216078#comment-16216078
 ] 

Maryann Xue commented on PHOENIX-3999:
--

[~jamestaylor], it would still be a hash join but executed on client side 
instead (nested loop is sure to be more costly). In theory the difference 
between server-side and client-side hash join is the former is (should be) done 
in parallel but has extra cost of broadcasting the RHS over the network to all 
servers. I guess if the transfer of RHS gets too costly that it would undo the 
benefit of executing join in parallel, it's probably not a good idea to do a 
hash join anyway. 

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
>  

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-23 Thread Ethan Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215840#comment-16215840
 ] 

Ethan Wang commented on PHOENIX-3999:
-

(Maybe  irrelevant) Is time out enforced for a query as a whole?  If we only 
care the timeout for each rpc call for each parallel scans, should it be more 
efficient  to leave it to parallescanner to deicide how to chunk the scans.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
> conn.createStatement().execute("UPSERT INTO 
> 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-23 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215796#comment-16215796
 ] 

James Taylor commented on PHOENIX-3999:
---

bq. This new approach has BATCH_SEQUENCE_NUM cache as variables in Java so no 
join operation is needed now
Right - so this might be a general, viable solution - chunk it up and execute 
the join on the client instead of doing a broadcast join. Is it like a nested 
loop join? Not sure under what conditions this could be done or when it'd be 
more efficient.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-23 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16215785#comment-16215785
 ] 

Maryann Xue commented on PHOENIX-3999:
--

I think the purpose of the above client logic is, as [~wdumaresq] said, to 
avoid timeout. The whole query is divided and executed in small ranges. Just to 
confirm though, would the first query that selects from COMPLETED_BATCHES also 
timeout if not executed by ranges?
[~jamestaylor], "IN" queries like in Will's case have already been optimized. 
The only thing that triggered the broadcast join operation in the original 
query is "BATCH_SEQUENCE_NUM", which is not available from ITEMS table and has 
to be joined from COMPLETED_BATCHES. This new approach has BATCH_SEQUENCE_NUM 
cache as variables in Java so no join operation is needed now.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-20 Thread Will Dumaresq (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213415#comment-16213415
 ] 

Will Dumaresq commented on PHOENIX-3999:


The query pattern is:


{code:java}
do {

 SELECT BATCH_ID, BATCH_SEQUENCE_NUM FROM COMPLETED_BATCHES
 WHERE (BATCH_SEQUENCE_NUM, BATCH_ID) > (previous_batch_sequence_num, 
previous_batch_id)
   AND BATCH_SEQUENCE_NUM > starting_batch_num
 ORDER BY BATCH_SEQUENCE_NUM, BATCH_ID
 LIMIT chunk_size

for (batch_id in batch_ids) {
 do { 
  SELECT * FROM ITEMS
  WHERE BATCH_ID = batch_id AND
(BATCH_ID, ITEM_ID) > (batch_id, previous_item_id)
  ORDER BY BATCH_ID, ITEM_ID
  LIMIT chunk_size
  
  set previous_item_id to last ITEM_ID in results
  } while there are still more ITEMS to get
}

set prev_* variables to last BATCH in results
} while there are still more BATCHes to get
{code}


> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-20 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213334#comment-16213334
 ] 

James Taylor commented on PHOENIX-3999:
---

Thanks, [~wdumaresq]. So from the outer query you collect up (BATCH_ID, 
ITEM_ID) pairs and then query into the ITEMS table with those for the inner 
query?

I wonder if this type of logic can be generalized in a new type of join 
strategy, [~maryannxue]?

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
> 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-20 Thread Will Dumaresq (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213295#comment-16213295
 ] 

Will Dumaresq commented on PHOENIX-3999:


Hi [~jamestaylor],

The original use case was to be able to execute this query, so no, 
BATCH_SEQUENCE_NUM would not be held constant in each query:

{code:sql}
UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)
   SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
   FROM  ITEMS i, COMPLETED_BATCHES b
   WHERE b.BATCH_ID = i.BATCH_ID AND  
   b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
{code}

However, we have moved on from trying to use the query in this way, and 
probably would not return to it even if it could be made to be very efficient, 
for a couple reasons:

1) No matter how efficient it is, there would never be a guarantee that it 
would complete in less than n seconds, and our Phoenix environment places a 
restriction on us that our queries have to complete in less than n seconds 
(where n=120 at the moment).

2) We have modified our architecture so that ITEMS/COMPLETED_BATCHES and 
COMPLETED_ITEMS are hidden behind two separate abstractions, so we now (in Java 
code) query from the COMPLETED_BATCHES table to find COMPLETED_BATCHES, then 
have an inner loop that queries for ITEMS matching the COMPLETED_BATCHES, then 
we write those to our COMPLETED_ITEMS abstraction. We do all these queries in 
small batches to avoid running into the 2-minute timeout.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
>

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-10-20 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16213191#comment-16213191
 ] 

James Taylor commented on PHOENIX-3999:
---

I believe in this case the user is holding BATCH_SEQUENCE_NUM constant in their 
query, essentially processing for the same BATCH_SEQUENCE_NUM per query, but 
let me try to get clarification.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-08-28 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144780#comment-16144780
 ] 

Maryann Xue commented on PHOENIX-3999:
--

The reason why hash cache is used in this query is that there is 
"b.BATCH_SEQUENCE_NUM" in the SELECT clause, so we have to do the actual join 
operation for referenced fields (in this case, only 1 field) from RHS. 
Although, in some sense the join is driven "by the client side" through 
skip-scan filter, only that we cannot omit the join operation as we would for 
semi-joins.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-08-28 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144542#comment-16144542
 ] 

James Taylor commented on PHOENIX-3999:
---

bq. only during inner join: ITEMS table as parent will receive a HashCacheImpl 
from RHS in order to look up the b.BATCH_SEQUENCE_NUM

It's good that a skip scan is used during the scan of ITEMS. However, rather 
than scan the RHS and broadcast it, it'd be more efficient to take the same 
approach as with the semi join and drive the join on the client side as the LHS 
is being scanned.

WDYT, [~maryannxue]?

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
>  

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-08-23 Thread Ethan Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138023#comment-16138023
 ] 

Ethan Wang commented on PHOENIX-3999:
-

(I just realized in the last comment I mistakenly used the term "Dynamic 
Filter" interchangeably with "HashCacheImpl".  Actually "Dynamic Filter" means 
"SkipScanFilter".)

So the inner join query you quoted above, it seems to me it _does_ use skip 
scan coming from COMPLETED_BATCHES, just like semi join.

So in both cases, the RHS will return a skip scan filter:  B.BATCH_ID in (2, 
4), which transfer to I.BATCH_ID in (2, 4) and pass to the parent. In this 
case, inside skip scan filter's slots  (which is List) will 
includes two tinny keyranges:
{code:java}
0 = {KeyRange@4041} "\x80\x00\x00\x00\x00\x00\x00\x02"
1 = {KeyRange@4042} "\x80\x00\x00\x00\x00\x00\x00\x04"
{code}


So I think to sum up the difference between this inner join and semi join:
-both use skip scan to reduce the length of ITEMS table scanning.
-only during inner join: ITEMS table as parent will receive a HashCacheImpl 
from RHS in order to look up the the b.BATCH_SEQUENCE_NUM. 



> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-08-22 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137065#comment-16137065
 ] 

James Taylor commented on PHOENIX-3999:
---

Just to confirm, [~aertoria], this query:
{code}
SELECT 
i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
FROM  
ITEMS i, COMPLETED_BATCHES b
WHERE 
   b.BATCH_ID = i.BATCH_ID
   AND b.BATCH_SEQUENCE_NUM > 0 
   AND b.BATCH_SEQUENCE_NUM < 2;
{code}
does not use a skip scan from COMPLETED_BATCHES into ITEMS?

Yes, it'll be way more efficient to do a skip scan here, as is done in the case 
of the semi join.

FYI, [~maryannxue].

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
>  

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-08-20 Thread Ethan Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134342#comment-16134342
 ] 

Ethan Wang commented on PHOENIX-3999:
-

Thanks [~maryannxue]. I think the score based Calcite optimization is the way 
to go. For now,  here at salesforce we got some imminent use cases related to 
this ticket and I'm taking a look.

As for the step one, I'm checking if the inner join and the semi join of the 
same query ( that [~jamestaylor] posted above) are end up doing as explain 
says. Here are my findings, basically both are as [~maryannxue] rightly pointed 
out.

1, For Inner join, i.e.,
{code:java}
SELECT 
i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
FROM  
ITEMS i, COMPLETED_BATCHES b
WHERE 
   b.BATCH_ID = i.BATCH_ID
   AND b.BATCH_SEQUENCE_NUM > 0 
   AND b.BATCH_SEQUENCE_NUM < 2;
{code}


The plan explanation is accuate. 

What happened was: It turn into a HashJoinPlan. parent LHS is "Item", RHS is a 
hash join subplan for "COMPLETED_BATCHES". This subplan will first execute a 
regular scan (even before user start calling rs.next()). When it finishes, it 
turn into a "Dynamic Filter" and broadcast to all "ITEM" regions and persist as 
HashCacheImpl obj. On the region server I was able to observe the hashCache 
object contains the two COMPLETED_BATCHES rows. Later on, when LHS skip-scan 
starts, inside HashJoinRegionScanner, will consult with this cached dynamic 
filter when fetching each result tuples to resultQueue and send back to user.


bq. while in the query you mentioned above b.BATCH_SEQUENCE_NUM is from RHS and 
is not part of the join key, so SKIP-SCAN-JOIN is not used. Still this query 
satisfies the child-parent join optimization conditions and should be fired, 
which mean "dynamic filter" should appear in the plan. Does it work that way 
now?

So the answer should be yes.



2, when written in semi join, i.e.,
{code:java}
SELECT 
ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE   
FROM  
ITEMS i
WHERE EXISTS (  
SELECT 1 FROM COMPLETED_BATCHES b 
WHERE b.BATCH_ID = i.BATCH_ID 
AND   b.BATCH_SEQUENCE_NUM > 0 
AND   b.BATCH_SEQUENCE_NUM < 2
)
{code}

Still it becomes a HashJoinPlan where parent is "ITEM" table scan but the RHS 
subplan becomes point look up (TupleProjectionPlan). No ServerCache (dynamic 
filter) object has ever been generated or broadcasted. This RHS subplan now 
becomes a static constant key range and is merged with parent's where clause.

So, if I understand right, the semi join is supposed to be more "optimized" 
than the inner join one, because less info is required for the final result. 
And also, "SKIP-SCAN-JOIN" should *always* outpreform the regular "DYNAMIC 
FILTER", becuase "DYNAMIC FILTER" at most can not reduce the scan length on the 
parent. However "SKIP-SCAN-JOIN" sometimes can. As a result semi join should 
"always" performs better in all senario. (right?)

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-08-08 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118630#comment-16118630
 ] 

Maryann Xue commented on PHOENIX-3999:
--

Good question, [~aertoria]! It's actually part of our cost-based optimization 
which is being developed on "calcite" branch. We should also be able to choose 
between hash-join and sort-merge-join based on the size of LHS and RHS. You can 
take a look at 
https://www.slideshare.net/slideshow/embed_code/key/M3VyIu89dywJn0 if you are 
interested.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-08-08 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118046#comment-16118046
 ] 

James Taylor commented on PHOENIX-3999:
---

[~maryannxue] - do you know the answer to the question above?

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)");
> conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, 
> item_id, item_type, item_value) VALUES (1,100, 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-08-08 Thread Ethan Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117999#comment-16117999
 ] 

Ethan Wang commented on PHOENIX-3999:
-

bq. A SKIP-SCAN-JOIN usually scans RHS to get a list of join key values, and 
then performs a skip scan on LHS using these values. 

Is it true that always the RHS get scanned first, the result is then used as 
"dynamic filter" to perform SKIP-SCAN on LHS, never the other way around 
regardless the table size etc? 

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-07-19 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092778#comment-16092778
 ] 

Maryann Xue commented on PHOENIX-3999:
--

Yes, I think so. Maybe we need to refine the names in the explain plan to make 
it clearer to users. Please feel free to assign the task to me.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)");
> 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-07-14 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088354#comment-16088354
 ] 

James Taylor commented on PHOENIX-3999:
---

Ah, I see. So, yes, the plan had a DYNAMIC FILTER indicated. But I don't think 
it was causing a skip scan to occur. I can double check this, though. You think 
it should, [~maryannxue]? Thanks for the info!

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, 

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-07-14 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087434#comment-16087434
 ] 

Maryann Xue commented on PHOENIX-3999:
--

I was trying to clarify that a hash-join that performs a skip-scan on LHS based 
on the result from RHS is what we call "child-parent join optimization" and is 
usually indicated by "DYNAMIC FILTER" in the explain plan; while 
"SKIP-SCAN-JOIN" is a special case of "child-parent join optimization" that 
avoids the join operation at all as there is no need.
Does the plan of the above example show "DYNAMIC FILTER"?

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
>   

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-07-13 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16086686#comment-16086686
 ] 

James Taylor commented on PHOENIX-3999:
---

Thanks for the response, [~maryannxue]! We have both rows, the RHS and LHS. Why 
can't we include a reference to additional columns and just combine the two 
rows together (much like we do for our HashJoin)? We're driving from a scan of 
the RHS, right? So don't we have what ever rows we need there? And then on the 
LHS, if need be, we could project the rows we need in the scan doing the skip 
scan filter.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
>   

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-07-12 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16083513#comment-16083513
 ] 

Maryann Xue commented on PHOENIX-3999:
--

DYNAMIC FILTER indicates that child-parent join optimization will be performed, 
which means a skip-scan filter will be used while scanning LHS; while 
SKIP-SCAN-JOIN actually means that there will be no join operation at all, 
since no columns from RHS (other than join key) will be referenced. A 
SKIP-SCAN-JOIN usually scans RHS to get a list of join key values, and then 
performs a skip scan on LHS using these values. I just realized that the name 
SKIP-SCAN-JOIN might be confusing here.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
>   

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-07-09 Thread Maryann Xue (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16079616#comment-16079616
 ] 

Maryann Xue commented on PHOENIX-3999:
--

A semi join can use SKIP-SCAN-JOIN because it will not include any columns from 
RHS in the SELECT list; while in the query you mentioned above 
{{b.BATCH_SEQUENCE_NUM}} is from RHS and is not part of the join key, so 
SKIP-SCAN-JOIN is not used. Still this query satisfies the child-parent join 
optimization conditions and should be fired, which mean "dynamic filter" should 
appear in the plan. Does it work that way now? However, you've made a good 
point. We should optimize those inner join with no RHS columns referred in 
SELECT list.

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  

[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible

2017-07-07 Thread James Taylor (JIRA)

[ 
https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16078349#comment-16078349
 ] 

James Taylor commented on PHOENIX-3999:
---

[~maryannxue] - any thoughts on this? Would this be easy to do?

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> 
>
> Key: PHOENIX-3999
> URL: https://issues.apache.org/jira/browse/PHOENIX-3999
> Project: Phoenix
>  Issue Type: Bug
>Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of 
> point queries (as opposed to a broadcast hash join), however inner joins do 
> not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
> BATCH_SEQUENCE_NUM BIGINT NOT NULL,
> BATCH_ID   BIGINT NOT NULL,
> CONSTRAINT PK PRIMARY KEY
> (
> BATCH_SEQUENCE_NUM,
> BATCH_ID
> )
> );
> CREATE TABLE ITEMS (
>BATCH_ID BIGINT NOT NULL,
>ITEM_ID BIGINT NOT NULL,
>ITEM_TYPE BIGINT,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
> BATCH_ID,
> ITEM_ID
>)
> );
> CREATE TABLE COMPLETED_ITEMS (
>ITEM_TYPE  BIGINT NOT NULL,
>BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>ITEM_IDBIGINT NOT NULL,
>ITEM_VALUE VARCHAR,
>CONSTRAINT PK PRIMARY KEY
>(
>   ITEM_TYPE,
>   BATCH_SEQUENCE_NUM,  
>   ITEM_ID
>)
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed 
> like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
> SKIP-SCAN-JOIN TABLE 0
> CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
> SERVER FILTER BY FIRST KEY ONLY
> SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
> CLIENT MERGE SORT
> DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is 
> used such as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, 
> ITEM_VALUE)
>SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>FROM  ITEMS i, COMPLETED_BATCHES b
>WHERE b.BATCH_ID = i.BATCH_ID AND  
>b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
> @Test
> public void testNestedLoopJoin() throws Exception {
> try (Connection conn = DriverManager.getConnection(getUrl())) {
> String t1="COMPLETED_BATCHES";
> String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
> "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "BATCH_ID   BIGINT NOT NULL,\n" + 
> "CONSTRAINT PK PRIMARY KEY\n" + 
> "(\n" + 
> "BATCH_SEQUENCE_NUM,\n" + 
> "BATCH_ID\n" + 
> ")\n" + 
> ")" + 
> "";
> conn.createStatement().execute(ddl1);
> 
> String t2="ITEMS";
> String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
> "   BATCH_ID BIGINT NOT NULL,\n" + 
> "   ITEM_ID BIGINT NOT NULL,\n" + 
> "   ITEM_TYPE BIGINT,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "BATCH_ID,\n" + 
> "ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl2);
> String t3="COMPLETED_ITEMS";
> String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
> "   ITEM_TYPE  BIGINT NOT NULL,\n" + 
> "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
> "   ITEM_IDBIGINT NOT NULL,\n" + 
> "   ITEM_VALUE VARCHAR,\n" + 
> "   CONSTRAINT PK PRIMARY KEY\n" + 
> "   (\n" + 
> "  ITEM_TYPE,\n" + 
> "  BATCH_SEQUENCE_NUM,  \n" + 
> "  ITEM_ID\n" + 
> "   )\n" + 
> ")";
> conn.createStatement().execute(ddl3);
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)");
> conn.createStatement().execute("UPSERT INTO 
> "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)");
> conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, 
> item_id, item_type, item_value) VALUES (1,100,