Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-24 Thread via GitHub


yunfan123 commented on PR #24104:
URL: https://github.com/apache/flink/pull/24104#issuecomment-1908175907

   @LB-Yu hello. The code style is be adopt in latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-24 Thread via GitHub


LB-Yu commented on code in PR #24104:
URL: https://github.com/apache/flink/pull/24104#discussion_r1464628624


##
docs/content/docs/dev/table/sql/queries/joins.md:
##
@@ -326,6 +326,17 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
+### Hash Shuffle Lookup Join

Review Comment:
   Shuffle Hash Lookup Join?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-24 Thread via GitHub


LB-Yu commented on code in PR #24104:
URL: https://github.com/apache/flink/pull/24104#discussion_r1464627984


##
docs/content/docs/dev/table/sql/queries/joins.md:
##
@@ -326,6 +326,17 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
+### Hash Shuffle Lookup Join
+Some lookup source connectors use cache to reduce RPC call times. In order to 
raise cache hit ratio for those connectors, user could use a hint to enable 
partitioned lookup join which enforces input of lookup join to hash shuffle by 
look up keys.
+
+```sql
+-- enable partitioned lookup join by SHUFFLE_HASH hint

Review Comment:
   My consideration is, should we use the term SHUFFLE_HASH uniformly, instead 
of partition join appearing in documents and code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-24 Thread via GitHub


LB-Yu commented on code in PR #24104:
URL: https://github.com/apache/flink/pull/24104#discussion_r1464624954


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLookupJoinRule.scala:
##
@@ -73,7 +75,19 @@ object BatchPhysicalLookupJoinRule {
 val cluster = join.getCluster
 
 val providedTrait = 
join.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
-val requiredTrait = 
input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+var requiredTrait = 
input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+val partitionJoinHint = join.getHints
+  .stream()
+  .filter(hint => JoinStrategy.isShuffleHashHint(hint.hintName))
+  .findFirst()
+// if partitioning enabled, use the join key as partition key
+if (
+  partitionJoinHint.isPresent &&
+  partitionJoinHint.get().listOptions.contains(FlinkHints.RIGHT_INPUT) &&
+  !joinInfo.pairs().isEmpty
+) {

Review Comment:
   To keep the code here clean, my suggestion is to abstract the code that 
determines whether to enable SHUFFLE_HASH into `JoinUtil`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-23 Thread via GitHub


yunfan123 commented on PR #24104:
URL: https://github.com/apache/flink/pull/24104#issuecomment-1907264935

   @LB-Yu Hello. Thanks for your comment. I have resolved the issue in the 
latest commit and supplemented it with additional unit tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-23 Thread via GitHub


LB-Yu commented on code in PR #24104:
URL: https://github.com/apache/flink/pull/24104#discussion_r1463174712


##
docs/content.zh/docs/dev/table/sql/queries/joins.md:
##
@@ -324,6 +324,16 @@ FROM Orders AS o
 
 在上面的示例中,Orders 表由保存在 MySQL 数据库中的 Customers 表数据来丰富。带有后续 process time 属性的 `FOR 
SYSTEM_TIME AS OF` 子句确保在联接运算符处理 `Orders` 行时,`Orders` 的每一行都与 join 条件匹配的 Customer 
行连接。它还防止连接的 `Customer` 表在未来发生更新时变更连接结果。lookup join 还需要一个强制的相等连接条件,在上面的示例中是 
`o.customer_id = c.id`。
 
+### Hash Shuffle Lookup Join
+一些Lookup join source使用缓存来减少访问维表的次数。为了提高这些连接器的缓存命中率,用户可以使用一个SQL 
Hint来启用预分区能力,这会强制在Lookup Join之前,按Join key进行一次hash。
+```sql
+-- 使用shuffle hash hint开启 partitioned lookup join

Review Comment:
   Should the expressions in the document uniformly use SHFFLE_HASH?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-23 Thread via GitHub


LB-Yu commented on code in PR #24104:
URL: https://github.com/apache/flink/pull/24104#discussion_r1463173501


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala:
##
@@ -76,8 +77,15 @@ object StreamPhysicalLookupJoinRule {
 val cluster = join.getCluster
 
 val providedTrait = 
join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-val requiredTrait = 
input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-
+var requiredTrait = 
input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+val partitionJoinHint = join.getHints
+  .stream()
+  .filter(hint => JoinStrategy.isShuffleHashHint(hint.hintName))
+  .findFirst()
+// if partitioning enabled, use the join key as partition key
+if (partitionJoinHint.isPresent && !joinInfo.pairs().isEmpty) {

Review Comment:
   I think there is something wrong with the way to determine whether 
SHUFFLE_HASH is enabled. If I mistakenly use SHUFFLE_HASH('left_table'), 
SHUFFLE_HASH will still be enabled instead of ignoring the hint. In fact, you 
should at least check whether the table in hint is the right table. And this 
should be reflected in the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-23 Thread via GitHub


yunfan123 commented on PR #24104:
URL: https://github.com/apache/flink/pull/24104#issuecomment-1905561284

   Hello  @LB-Yu  .
   I have implemented the feature as described in flip204 using query hints and 
have rebased onto the latest master branch. Additionally, this PR involves 
significant changes to the test plans in the XML files, although the actual 
modifications are quite small. 
   My suggestion for reviewing these XML files is as follows: at the top are 
the new test results (corresponding to this feature being enabled), and at the 
bottom are the old test results (corresponding to this feature being disabled 
in this PR). 
   There are no changes in the old test results. 
   Therefore, the focus should be on the diffs between the new and old test 
results. 
   These diffs can be easily identified by first removing the old test results 
locally. 
   This way, using a comparison tool can clearly highlight the differences.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-22 Thread via GitHub


yunfan123 commented on code in PR #24104:
URL: https://github.com/apache/flink/pull/24104#discussion_r1462659082


##
docs/content/docs/dev/table/sql/queries/joins.md:
##
@@ -326,6 +326,17 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
+### Partitioned Lookup Join
+Some lookup source connectors use cache to reduce RPC call times. In order to 
raise cache hit ratio for those connectors, user could use a hint to enable 
partitioned lookup join which enforces input of lookup join to hash shuffle by 
look up keys.
+
+```sql
+-- enable partitioned lookup join by partitioned hint

Review Comment:
   Thanks for your comments, I will use `SHUFFLE_HASH ` hint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-22 Thread via GitHub


LB-Yu commented on code in PR #24104:
URL: https://github.com/apache/flink/pull/24104#discussion_r1461708094


##
docs/content/docs/dev/table/sql/queries/joins.md:
##
@@ -326,6 +326,17 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
+### Partitioned Lookup Join
+Some lookup source connectors use cache to reduce RPC call times. In order to 
raise cache hit ratio for those connectors, user could use a hint to enable 
partitioned lookup join which enforces input of lookup join to hash shuffle by 
look up keys.
+
+```sql
+-- enable partitioned lookup join by partitioned hint

Review Comment:
   Maybe the grammar in FLIP-204 is a better choice? When joining multiple 
dimension tables, we may only need to specify hash join for one of the 
dimension tables. like this:
   ```sql
   SELECT /*+ SHUFFLE_HASH(D1) */
   FROM t AS T 
   LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
   LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
   ```
   
   And `SHUFFLE_HASH` is consistent with the [existing hint 
name](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#shuffle_hash)
 of batch join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-22 Thread via GitHub


LB-Yu commented on code in PR #24104:
URL: https://github.com/apache/flink/pull/24104#discussion_r1461708094


##
docs/content/docs/dev/table/sql/queries/joins.md:
##
@@ -326,6 +326,17 @@ FROM Orders AS o
 
 In the example above, the Orders table is enriched with data from the 
Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` 
clause with the subsequent processing time attribute ensures that each row of 
the `Orders` table is joined with those Customers rows that match the join 
predicate at the point in time when the `Orders` row is processed by the join 
operator. It also prevents that the join result is updated when a joined 
`Customer` row is updated in the future. The lookup join also requires a 
mandatory equality join predicate, in the example above `o.customer_id = c.id`.
 
+### Partitioned Lookup Join
+Some lookup source connectors use cache to reduce RPC call times. In order to 
raise cache hit ratio for those connectors, user could use a hint to enable 
partitioned lookup join which enforces input of lookup join to hash shuffle by 
look up keys.
+
+```sql
+-- enable partitioned lookup join by partitioned hint

Review Comment:
   Maybe the grammar in FLIP-204 is a better choice? When joining multiple 
dimension tables, we may only need to specify hash join for one of the 
dimension tables. like this:
   ```sql
   SELECT /*+ SHUFFLE_HASH(D1) */
   FROM t AS T 
   LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
   LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
   ```
   
   And `SHUFFLE_HASH` is consistent with the existing hint name of batch join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-16 Thread via GitHub


flinkbot commented on PR #24104:
URL: https://github.com/apache/flink/pull/24104#issuecomment-1893726942

   
   ## CI report:
   
   * 243473d41af7715fb788f7c87a4dae9147e9e8fd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]

2024-01-16 Thread via GitHub


yunfan123 opened a new pull request, #24104:
URL: https://github.com/apache/flink/pull/24104

   ## What is the purpose of the change
   
   In order to raise cache hit ratio, we could enforce input of lookup join to 
shuffle by join key.
   Introduce a hint to enable partitioned lookup join.
   And it primarily derived from this MR: 
https://github.com/apache/flink/pull/18174/files, and adapted unit tests to the 
latest version.
   
   ## Brief change log
   
 - *Update `FlinkHints` and `FlinkHintStrategies` to introduce partitioned 
join hint*
 - *Update related rule, to enforce hash distribution on input, including 
`BatchPhysicalLookupJoinRule` and `StreamPhysicalLookupJoinRule`*
   
   ## Verifying this change
 - *Plan test in batch `LookupJoinTest` and stream LookupJoinTest*
 - *IT test in batch `LookupJoinITCase` and stream `LookupJoinITCase` and 
`AsyncLookupJoinITCase`*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org