Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
yunfan123 commented on PR #24104: URL: https://github.com/apache/flink/pull/24104#issuecomment-1908175907 @LB-Yu hello. The code style is be adopt in latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
LB-Yu commented on code in PR #24104: URL: https://github.com/apache/flink/pull/24104#discussion_r1464628624 ## docs/content/docs/dev/table/sql/queries/joins.md: ## @@ -326,6 +326,17 @@ FROM Orders AS o In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` clause with the subsequent processing time attribute ensures that each row of the `Orders` table is joined with those Customers rows that match the join predicate at the point in time when the `Orders` row is processed by the join operator. It also prevents that the join result is updated when a joined `Customer` row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above `o.customer_id = c.id`. +### Hash Shuffle Lookup Join Review Comment: Shuffle Hash Lookup Join? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
LB-Yu commented on code in PR #24104: URL: https://github.com/apache/flink/pull/24104#discussion_r1464627984 ## docs/content/docs/dev/table/sql/queries/joins.md: ## @@ -326,6 +326,17 @@ FROM Orders AS o In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` clause with the subsequent processing time attribute ensures that each row of the `Orders` table is joined with those Customers rows that match the join predicate at the point in time when the `Orders` row is processed by the join operator. It also prevents that the join result is updated when a joined `Customer` row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above `o.customer_id = c.id`. +### Hash Shuffle Lookup Join +Some lookup source connectors use cache to reduce RPC call times. In order to raise cache hit ratio for those connectors, user could use a hint to enable partitioned lookup join which enforces input of lookup join to hash shuffle by look up keys. + +```sql +-- enable partitioned lookup join by SHUFFLE_HASH hint Review Comment: My consideration is, should we use the term SHUFFLE_HASH uniformly, instead of partition join appearing in documents and code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
LB-Yu commented on code in PR #24104: URL: https://github.com/apache/flink/pull/24104#discussion_r1464624954 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLookupJoinRule.scala: ## @@ -73,7 +75,19 @@ object BatchPhysicalLookupJoinRule { val cluster = join.getCluster val providedTrait = join.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) -val requiredTrait = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) +var requiredTrait = input.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL) +val partitionJoinHint = join.getHints + .stream() + .filter(hint => JoinStrategy.isShuffleHashHint(hint.hintName)) + .findFirst() +// if partitioning enabled, use the join key as partition key +if ( + partitionJoinHint.isPresent && + partitionJoinHint.get().listOptions.contains(FlinkHints.RIGHT_INPUT) && + !joinInfo.pairs().isEmpty +) { Review Comment: To keep the code here clean, my suggestion is to abstract the code that determines whether to enable SHUFFLE_HASH into `JoinUtil`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
yunfan123 commented on PR #24104: URL: https://github.com/apache/flink/pull/24104#issuecomment-1907264935 @LB-Yu Hello. Thanks for your comment. I have resolved the issue in the latest commit and supplemented it with additional unit tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
LB-Yu commented on code in PR #24104: URL: https://github.com/apache/flink/pull/24104#discussion_r1463174712 ## docs/content.zh/docs/dev/table/sql/queries/joins.md: ## @@ -324,6 +324,16 @@ FROM Orders AS o 在上面的示例中,Orders 表由保存在 MySQL 数据库中的 Customers 表数据来丰富。带有后续 process time 属性的 `FOR SYSTEM_TIME AS OF` 子句确保在联接运算符处理 `Orders` 行时,`Orders` 的每一行都与 join 条件匹配的 Customer 行连接。它还防止连接的 `Customer` 表在未来发生更新时变更连接结果。lookup join 还需要一个强制的相等连接条件,在上面的示例中是 `o.customer_id = c.id`。 +### Hash Shuffle Lookup Join +一些Lookup join source使用缓存来减少访问维表的次数。为了提高这些连接器的缓存命中率,用户可以使用一个SQL Hint来启用预分区能力,这会强制在Lookup Join之前,按Join key进行一次hash。 +```sql +-- 使用shuffle hash hint开启 partitioned lookup join Review Comment: Should the expressions in the document uniformly use SHFFLE_HASH? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
LB-Yu commented on code in PR #24104: URL: https://github.com/apache/flink/pull/24104#discussion_r1463173501 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala: ## @@ -76,8 +77,15 @@ object StreamPhysicalLookupJoinRule { val cluster = join.getCluster val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) -val requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) - +var requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) +val partitionJoinHint = join.getHints + .stream() + .filter(hint => JoinStrategy.isShuffleHashHint(hint.hintName)) + .findFirst() +// if partitioning enabled, use the join key as partition key +if (partitionJoinHint.isPresent && !joinInfo.pairs().isEmpty) { Review Comment: I think there is something wrong with the way to determine whether SHUFFLE_HASH is enabled. If I mistakenly use SHUFFLE_HASH('left_table'), SHUFFLE_HASH will still be enabled instead of ignoring the hint. In fact, you should at least check whether the table in hint is the right table. And this should be reflected in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
yunfan123 commented on PR #24104: URL: https://github.com/apache/flink/pull/24104#issuecomment-1905561284 Hello @LB-Yu . I have implemented the feature as described in flip204 using query hints and have rebased onto the latest master branch. Additionally, this PR involves significant changes to the test plans in the XML files, although the actual modifications are quite small. My suggestion for reviewing these XML files is as follows: at the top are the new test results (corresponding to this feature being enabled), and at the bottom are the old test results (corresponding to this feature being disabled in this PR). There are no changes in the old test results. Therefore, the focus should be on the diffs between the new and old test results. These diffs can be easily identified by first removing the old test results locally. This way, using a comparison tool can clearly highlight the differences. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
yunfan123 commented on code in PR #24104: URL: https://github.com/apache/flink/pull/24104#discussion_r1462659082 ## docs/content/docs/dev/table/sql/queries/joins.md: ## @@ -326,6 +326,17 @@ FROM Orders AS o In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` clause with the subsequent processing time attribute ensures that each row of the `Orders` table is joined with those Customers rows that match the join predicate at the point in time when the `Orders` row is processed by the join operator. It also prevents that the join result is updated when a joined `Customer` row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above `o.customer_id = c.id`. +### Partitioned Lookup Join +Some lookup source connectors use cache to reduce RPC call times. In order to raise cache hit ratio for those connectors, user could use a hint to enable partitioned lookup join which enforces input of lookup join to hash shuffle by look up keys. + +```sql +-- enable partitioned lookup join by partitioned hint Review Comment: Thanks for your comments, I will use `SHUFFLE_HASH ` hint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
LB-Yu commented on code in PR #24104: URL: https://github.com/apache/flink/pull/24104#discussion_r1461708094 ## docs/content/docs/dev/table/sql/queries/joins.md: ## @@ -326,6 +326,17 @@ FROM Orders AS o In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` clause with the subsequent processing time attribute ensures that each row of the `Orders` table is joined with those Customers rows that match the join predicate at the point in time when the `Orders` row is processed by the join operator. It also prevents that the join result is updated when a joined `Customer` row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above `o.customer_id = c.id`. +### Partitioned Lookup Join +Some lookup source connectors use cache to reduce RPC call times. In order to raise cache hit ratio for those connectors, user could use a hint to enable partitioned lookup join which enforces input of lookup join to hash shuffle by look up keys. + +```sql +-- enable partitioned lookup join by partitioned hint Review Comment: Maybe the grammar in FLIP-204 is a better choice? When joining multiple dimension tables, we may only need to specify hash join for one of the dimension tables. like this: ```sql SELECT /*+ SHUFFLE_HASH(D1) */ FROM t AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b ``` And `SHUFFLE_HASH` is consistent with the [existing hint name](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#shuffle_hash) of batch join. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
LB-Yu commented on code in PR #24104: URL: https://github.com/apache/flink/pull/24104#discussion_r1461708094 ## docs/content/docs/dev/table/sql/queries/joins.md: ## @@ -326,6 +326,17 @@ FROM Orders AS o In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` clause with the subsequent processing time attribute ensures that each row of the `Orders` table is joined with those Customers rows that match the join predicate at the point in time when the `Orders` row is processed by the join operator. It also prevents that the join result is updated when a joined `Customer` row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above `o.customer_id = c.id`. +### Partitioned Lookup Join +Some lookup source connectors use cache to reduce RPC call times. In order to raise cache hit ratio for those connectors, user could use a hint to enable partitioned lookup join which enforces input of lookup join to hash shuffle by look up keys. + +```sql +-- enable partitioned lookup join by partitioned hint Review Comment: Maybe the grammar in FLIP-204 is a better choice? When joining multiple dimension tables, we may only need to specify hash join for one of the dimension tables. like this: ```sql SELECT /*+ SHUFFLE_HASH(D1) */ FROM t AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b ``` And `SHUFFLE_HASH` is consistent with the existing hint name of batch join. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
flinkbot commented on PR #24104: URL: https://github.com/apache/flink/pull/24104#issuecomment-1893726942 ## CI report: * 243473d41af7715fb788f7c87a4dae9147e9e8fd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-23687][table-planner] Introduce partitioned lookup join to enforce input of LookupJoin to hash shuffle by lookup keys [flink]
yunfan123 opened a new pull request, #24104: URL: https://github.com/apache/flink/pull/24104 ## What is the purpose of the change In order to raise cache hit ratio, we could enforce input of lookup join to shuffle by join key. Introduce a hint to enable partitioned lookup join. And it primarily derived from this MR: https://github.com/apache/flink/pull/18174/files, and adapted unit tests to the latest version. ## Brief change log - *Update `FlinkHints` and `FlinkHintStrategies` to introduce partitioned join hint* - *Update related rule, to enforce hash distribution on input, including `BatchPhysicalLookupJoinRule` and `StreamPhysicalLookupJoinRule`* ## Verifying this change - *Plan test in batch `LookupJoinTest` and stream LookupJoinTest* - *IT test in batch `LookupJoinITCase` and stream `LookupJoinITCase` and `AsyncLookupJoinITCase`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org