[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336028#comment-17336028 ] Flink Jira Bot commented on FLINK-20174: This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > Labels: stale-major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17327537#comment-17327537 ] Flink Jira Bot commented on FLINK-20174: This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > Labels: stale-major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267590#comment-17267590 ] Steven Zhen Wu commented on FLINK-20174: [~sewen] [~lzljs3620320] Can you take a look if this is the right direction for fixing this problem? [https://github.com/stevenzwu/flink/commit/f5bb05ef74a1ae10e20b13c8796731378da076c1] > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17236672#comment-17236672 ] Stephan Ewen commented on FLINK-20174: -- I am a bit hesitant to put this change into 1.12 - it is quite a bit after feature freeze already. I think relaxing generic bounds (making them broader) is possible late without breaking the API, so let's look at this in the upcoming release? > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233755#comment-17233755 ] Steven Zhen Wu commented on FLINK-20174: I tried the change of setting up InputFiles map for DeleteFilter per FileScanTask. I was wrong earlier. It actually works fine. You can see the change here. https://github.com/stevenzwu/iceberg/pull/4 So I guess the only reason left is the throughput benefit from combining small files into a decent sized CombinedScanTask, which is still an important reason. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233272#comment-17233272 ] Jingsong Lee commented on FLINK-20174: -- [~stevenz3wu] Thanks for your explanation. Now the workaround is pass dummy filePath/offset/length to {{FileSourceSplit}}. Yes, it looks ugly. Yes, we can keep CombinedScanTask to have more freedom. (Maybe we can optimize the combination of file with delete files in the future, to share the delete files) > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233260#comment-17233260 ] Steven Zhen Wu commented on FLINK-20174: [~lzljs3620320] thanks a lot for sharing your thoughts. Regarding the hostname, it would be a derived information from the path of Iceberg DataFile. How to extract the hostname depends on the file system. I would image LocalityAwareSplitAssigner probably needs to take a HostnameExtractor function to extract hostname from IcebergSourceSplit. I am wondering if hostname should be a constructor arg for IcebergSourceSplit. Regarding the fine-grained split, here are my concerns of splitting CombinedScanTask into fine-grained FileScanTasks. * In the first try of PoC, I tried flapMap of CombinedScanTask into CombinedScanTasks (each with a single FileScanTask). If I remember correctly, DeleteFilter doesn't work in this case. DataIterator creates the InputFile map for the whole CombinedScanTask. Maybe there is a valid reason for that. I can double check on that with a unit test. * One of the main reasons of having CombinedScanTask is to combine small files/splits into a decent size. Because readers pull one split at a time, avoiding small splits is good for throughput. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233211#comment-17233211 ] Jingsong Lee commented on FLINK-20174: -- I'm not sure, we can have a work around way, and the hostnames in the file source split are also useful for iceberg (also for LocalityAwareSplitAssigner). (And I think in Flink, maybe, we don't need combine FileScanTasks, we can keep a fine-grained split, then, the iceberg split can extends FileSourceSplit) > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)