[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221468#comment-16221468 ] Will Dumaresq commented on PHOENIX-3999: {quote}Just to confirm though, would the first query that selects from COMPLETED_BATCHES also timeout if not executed by ranges?{quote} Probably not if executed by itself, but if we attempt to process the batches returned from this query while stepping through the result set, then I think there was some risk of that leading to a timeout (because of waiting too long between calls to next()). So that's why we are doing that one in chunks as well, where the result set is consumed and closed and the results are held in memory while processing the batches. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16216078#comment-16216078 ] Maryann Xue commented on PHOENIX-3999: -- [~jamestaylor], it would still be a hash join but executed on client side instead (nested loop is sure to be more costly). In theory the difference between server-side and client-side hash join is the former is (should be) done in parallel but has extra cost of broadcasting the RHS over the network to all servers. I guess if the transfer of RHS gets too costly that it would undo the benefit of executing join in parallel, it's probably not a good idea to do a hash join anyway. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16215840#comment-16215840 ] Ethan Wang commented on PHOENIX-3999: - (Maybe irrelevant) Is time out enforced for a query as a whole? If we only care the timeout for each rpc call for each parallel scans, should it be more efficient to leave it to parallescanner to deicide how to chunk the scans. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16215796#comment-16215796 ] James Taylor commented on PHOENIX-3999: --- bq. This new approach has BATCH_SEQUENCE_NUM cache as variables in Java so no join operation is needed now Right - so this might be a general, viable solution - chunk it up and execute the join on the client instead of doing a broadcast join. Is it like a nested loop join? Not sure under what conditions this could be done or when it'd be more efficient. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().exec
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16215785#comment-16215785 ] Maryann Xue commented on PHOENIX-3999: -- I think the purpose of the above client logic is, as [~wdumaresq] said, to avoid timeout. The whole query is divided and executed in small ranges. Just to confirm though, would the first query that selects from COMPLETED_BATCHES also timeout if not executed by ranges? [~jamestaylor], "IN" queries like in Will's case have already been optimized. The only thing that triggered the broadcast join operation in the original query is "BATCH_SEQUENCE_NUM", which is not available from ITEMS table and has to be joined from COMPLETED_BATCHES. This new approach has BATCH_SEQUENCE_NUM cache as variables in Java so no join operation is needed now. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + >
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16213415#comment-16213415 ] Will Dumaresq commented on PHOENIX-3999: The query pattern is: {code:java} do { SELECT BATCH_ID, BATCH_SEQUENCE_NUM FROM COMPLETED_BATCHES WHERE (BATCH_SEQUENCE_NUM, BATCH_ID) > (previous_batch_sequence_num, previous_batch_id) AND BATCH_SEQUENCE_NUM > starting_batch_num ORDER BY BATCH_SEQUENCE_NUM, BATCH_ID LIMIT chunk_size for (batch_id in batch_ids) { do { SELECT * FROM ITEMS WHERE BATCH_ID = batch_id AND (BATCH_ID, ITEM_ID) > (batch_id, previous_item_id) ORDER BY BATCH_ID, ITEM_ID LIMIT chunk_size set previous_item_id to last ITEM_ID in results } while there are still more ITEMS to get } set prev_* variables to last BATCH in results } while there are still more BATCHes to get {code} > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_ID
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16213334#comment-16213334 ] James Taylor commented on PHOENIX-3999: --- Thanks, [~wdumaresq]. So from the outer query you collect up (BATCH_ID, ITEM_ID) pairs and then query into the ITEMS table with those for the inner query? I wonder if this type of logic can be generalized in a new type of join strategy, [~maryannxue]? > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatem
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16213295#comment-16213295 ] Will Dumaresq commented on PHOENIX-3999: Hi [~jamestaylor], The original use case was to be able to execute this query, so no, BATCH_SEQUENCE_NUM would not be held constant in each query: {code:sql} UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE) SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE FROM ITEMS i, COMPLETED_BATCHES b WHERE b.BATCH_ID = i.BATCH_ID AND b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; {code} However, we have moved on from trying to use the query in this way, and probably would not return to it even if it could be made to be very efficient, for a couple reasons: 1) No matter how efficient it is, there would never be a guarantee that it would complete in less than n seconds, and our Phoenix environment places a restriction on us that our queries have to complete in less than n seconds (where n=120 at the moment). 2) We have modified our architecture so that ITEMS/COMPLETED_BATCHES and COMPLETED_ITEMS are hidden behind two separate abstractions, so we now (in Java code) query from the COMPLETED_BATCHES table to find COMPLETED_BATCHES, then have an inner loop that queries for ITEMS matching the COMPLETED_BATCHES, then we write those to our COMPLETED_ITEMS abstraction. We do all these queries in small batches to avoid running into the 2-minute timeout. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" +
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16213191#comment-16213191 ] James Taylor commented on PHOENIX-3999: --- I believe in this case the user is holding BATCH_SEQUENCE_NUM constant in their query, essentially processing for the same BATCH_SEQUENCE_NUM per query, but let me try to get clarification. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_i
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144780#comment-16144780 ] Maryann Xue commented on PHOENIX-3999: -- The reason why hash cache is used in this query is that there is "b.BATCH_SEQUENCE_NUM" in the SELECT clause, so we have to do the actual join operation for referenced fields (in this case, only 1 field) from RHS. Although, in some sense the join is driven "by the client side" through skip-scan filter, only that we cannot omit the join operation as we would for semi-joins. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.cre
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144542#comment-16144542 ] James Taylor commented on PHOENIX-3999: --- bq. only during inner join: ITEMS table as parent will receive a HashCacheImpl from RHS in order to look up the b.BATCH_SEQUENCE_NUM It's good that a skip scan is used during the scan of ITEMS. However, rather than scan the RHS and broadcast it, it'd be more efficient to take the same approach as with the semi join and drive the join on the client side as the LHS is being scanned. WDYT, [~maryannxue]? > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execu
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16138023#comment-16138023 ] Ethan Wang commented on PHOENIX-3999: - (I just realized in the last comment I mistakenly used the term "Dynamic Filter" interchangeably with "HashCacheImpl". Actually "Dynamic Filter" means "SkipScanFilter".) So the inner join query you quoted above, it seems to me it _does_ use skip scan coming from COMPLETED_BATCHES, just like semi join. So in both cases, the RHS will return a skip scan filter: B.BATCH_ID in (2, 4), which transfer to I.BATCH_ID in (2, 4) and pass to the parent. In this case, inside skip scan filter's slots (which is List>) will includes two tinny keyranges: {code:java} 0 = {KeyRange@4041} "\x80\x00\x00\x00\x00\x00\x00\x02" 1 = {KeyRange@4042} "\x80\x00\x00\x00\x00\x00\x00\x04" {code} So I think to sum up the difference between this inner join and semi join: -both use skip scan to reduce the length of ITEMS table scanning. -only during inner join: ITEMS table as parent will receive a HashCacheImpl from RHS in order to look up the the b.BATCH_SEQUENCE_NUM. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NO
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137065#comment-16137065 ] James Taylor commented on PHOENIX-3999: --- Just to confirm, [~aertoria], this query: {code} SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE FROM ITEMS i, COMPLETED_BATCHES b WHERE b.BATCH_ID = i.BATCH_ID AND b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2; {code} does not use a skip scan from COMPLETED_BATCHES into ITEMS? Yes, it'll be way more efficient to do a skip scan here, as is done in the case of the semi join. FYI, [~maryannxue]. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + >
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134342#comment-16134342 ] Ethan Wang commented on PHOENIX-3999: - Thanks [~maryannxue]. I think the score based Calcite optimization is the way to go. For now, here at salesforce we got some imminent use cases related to this ticket and I'm taking a look. As for the step one, I'm checking if the inner join and the semi join of the same query ( that [~jamestaylor] posted above) are end up doing as explain says. Here are my findings, basically both are as [~maryannxue] rightly pointed out. 1, For Inner join, i.e., {code:java} SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE FROM ITEMS i, COMPLETED_BATCHES b WHERE b.BATCH_ID = i.BATCH_ID AND b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2; {code} The plan explanation is accuate. What happened was: It turn into a HashJoinPlan. parent LHS is "Item", RHS is a hash join subplan for "COMPLETED_BATCHES". This subplan will first execute a regular scan (even before user start calling rs.next()). When it finishes, it turn into a "Dynamic Filter" and broadcast to all "ITEM" regions and persist as HashCacheImpl obj. On the region server I was able to observe the hashCache object contains the two COMPLETED_BATCHES rows. Later on, when LHS skip-scan starts, inside HashJoinRegionScanner, will consult with this cached dynamic filter when fetching each result tuples to resultQueue and send back to user. bq. while in the query you mentioned above b.BATCH_SEQUENCE_NUM is from RHS and is not part of the join key, so SKIP-SCAN-JOIN is not used. Still this query satisfies the child-parent join optimization conditions and should be fired, which mean "dynamic filter" should appear in the plan. Does it work that way now? So the answer should be yes. 2, when written in semi join, i.e., {code:java} SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE FROM ITEMS i WHERE EXISTS ( SELECT 1 FROM COMPLETED_BATCHES b WHERE b.BATCH_ID = i.BATCH_ID AND b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2 ) {code} Still it becomes a HashJoinPlan where parent is "ITEM" table scan but the RHS subplan becomes point look up (TupleProjectionPlan). No ServerCache (dynamic filter) object has ever been generated or broadcasted. This RHS subplan now becomes a static constant key range and is merged with parent's where clause. So, if I understand right, the semi join is supposed to be more "optimized" than the inner join one, because less info is required for the final result. And also, "SKIP-SCAN-JOIN" should *always* outpreform the regular "DYNAMIC FILTER", becuase "DYNAMIC FILTER" at most can not reduce the scan length on the parent. However "SKIP-SCAN-JOIN" sometimes can. As a result semi join should "always" performs better in all senario. (right?) > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16118630#comment-16118630 ] Maryann Xue commented on PHOENIX-3999: -- Good question, [~aertoria]! It's actually part of our cost-based optimization which is being developed on "calcite" branch. We should also be able to choose between hash-join and sort-merge-join based on the size of LHS and RHS. You can take a look at https://www.slideshare.net/slideshow/embed_code/key/M3VyIu89dywJn0 if you are interested. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INT
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16118046#comment-16118046 ] James Taylor commented on PHOENIX-3999: --- [~maryannxue] - do you know the answer to the question above? > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, > item_id, item_type, item_value) V
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16117999#comment-16117999 ] Ethan Wang commented on PHOENIX-3999: - bq. A SKIP-SCAN-JOIN usually scans RHS to get a list of join key values, and then performs a skip scan on LHS using these values. Is it true that always the RHS get scanned first, the result is then used as "dynamic filter" to perform SKIP-SCAN on LHS, never the other way around regardless the table size etc? > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM,
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092778#comment-16092778 ] Maryann Xue commented on PHOENIX-3999: -- Yes, I think so. Maybe we need to refine the names in the explain plan to make it clearer to users. Please feel free to assign the task to me. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)"); > conn.createStatem
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088354#comment-16088354 ] James Taylor commented on PHOENIX-3999: --- Ah, I see. So, yes, the plan had a DYNAMIC FILTER indicated. But I don't think it was causing a skip scan to occur. I can double check this, though. You think it should, [~maryannxue]? Thanks for the info! > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUE
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16087434#comment-16087434 ] Maryann Xue commented on PHOENIX-3999: -- I was trying to clarify that a hash-join that performs a skip-scan on LHS based on the result from RHS is what we call "child-parent join optimization" and is usually indicated by "DYNAMIC FILTER" in the explain plan; while "SKIP-SCAN-JOIN" is a special case of "child-parent join optimization" that avoids the join operation at all as there is no need. Does the plan of the above example show "DYNAMIC FILTER"? > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().ex
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086686#comment-16086686 ] James Taylor commented on PHOENIX-3999: --- Thanks for the response, [~maryannxue]! We have both rows, the RHS and LHS. Why can't we include a reference to additional columns and just combine the two rows together (much like we do for our HashJoin)? We're driving from a scan of the RHS, right? So don't we have what ever rows we need there? And then on the LHS, if need be, we could project the rows we need in the scan doing the skip scan filter. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execut
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083513#comment-16083513 ] Maryann Xue commented on PHOENIX-3999: -- DYNAMIC FILTER indicates that child-parent join optimization will be performed, which means a skip-scan filter will be used while scanning LHS; while SKIP-SCAN-JOIN actually means that there will be no join operation at all, since no columns from RHS (other than join key) will be referenced. A SKIP-SCAN-JOIN usually scans RHS to get a list of join key values, and then performs a skip scan on LHS using these values. I just realized that the name SKIP-SCAN-JOIN might be confusing here. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + >
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080567#comment-16080567 ] James Taylor commented on PHOENIX-3999: --- Why the limitation of not pull over columns from the RHS, [~maryannxue]? > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, > item_id, item_type, it
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079616#comment-16079616 ] Maryann Xue commented on PHOENIX-3999: -- A semi join can use SKIP-SCAN-JOIN because it will not include any columns from RHS in the SELECT list; while in the query you mentioned above {{b.BATCH_SEQUENCE_NUM}} is from RHS and is not part of the join key, so SKIP-SCAN-JOIN is not used. Still this query satisfies the child-parent join optimization conditions and should be fired, which mean "dynamic filter" should appear in the plan. Does it work that way now? However, you've made a good point. We should optimize those inner join with no RHS columns referred in SELECT list. > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + >
[jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078349#comment-16078349 ] James Taylor commented on PHOENIX-3999: --- [~maryannxue] - any thoughts on this? Would this be easy to do? > Optimize inner joins as SKIP-SCAN-JOIN when possible > > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug >Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( >BATCH_ID BIGINT NOT NULL, >ITEM_ID BIGINT NOT NULL, >ITEM_TYPE BIGINT, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > BATCH_ID, > ITEM_ID >) > ); > CREATE TABLE COMPLETED_ITEMS ( >ITEM_TYPE BIGINT NOT NULL, >BATCH_SEQUENCE_NUM BIGINT NOT NULL, >ITEM_IDBIGINT NOT NULL, >ITEM_VALUE VARCHAR, >CONSTRAINT PK PRIMARY KEY >( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID >) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) >SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE >FROM ITEMS i, COMPLETED_BATCHES b >WHERE b.BATCH_ID = i.BATCH_ID AND >b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > "BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > "BATCH_ID BIGINT NOT NULL,\n" + > "CONSTRAINT PK PRIMARY KEY\n" + > "(\n" + > "BATCH_SEQUENCE_NUM,\n" + > "BATCH_ID\n" + > ")\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > "BATCH_ID,\n" + > "ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_IDBIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, > item_id, item_type, item_value)