[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645998#comment-16645998 ] ASF GitHub Bot commented on FLINK-9697: --- aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0 URL: https://github.com/apache/flink/pull/6703#issuecomment-428828451 @yanghua According to the slf4j doc `Level` is only available since version `1.7.15` while Flink uses `1.7.7`: https://www.slf4j.org/api/org/slf4j/event/Level.html. Could you try updating to `1.7.15`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for Kafka 2.0.0 > - > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0
aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0 URL: https://github.com/apache/flink/pull/6703#issuecomment-428828451 @yanghua According to the slf4j doc `Level` is only available since version `1.7.15` while Flink uses `1.7.7`: https://www.slf4j.org/api/org/slf4j/event/Level.html. Could you try updating to `1.7.15`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang updated FLINK-10527: - Issue Type: Sub-task (was: Bug) Parent: FLINK-10392 > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Sub-task > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645970#comment-16645970 ] ASF GitHub Bot commented on FLINK-10527: yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-428818907 I try to remove these test methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-428818907 I try to remove these test methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645967#comment-16645967 ] ASF GitHub Bot commented on FLINK-10527: yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-428818387 @TisonKun it seems it is not been ignored, it causes my PR failed, see [here](https://travis-ci.org/apache/flink/jobs/439612829). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-428818387 @TisonKun it seems it is not been ignored, it causes my PR failed, see [here](https://travis-ci.org/apache/flink/jobs/439612829). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-7999) Variable Join Window Boundaries
[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-7999. -- Resolution: Won't Fix > Variable Join Window Boundaries > --- > > Key: FLINK-7999 > URL: https://issues.apache.org/jira/browse/FLINK-7999 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Seth Wiesman >Priority: Major > > Allow window joins with variable length based on row attributes. > Consider a two streams joined on an id, where one has start and end dates, it > would be useful to be able to join each row during is live durations. Today > this can be expressed in the datastream api using a CoProcessFunction. > left.id = right.id AND (left.time > right.start and left.time < right.end) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries
[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645931#comment-16645931 ] Hequn Cheng commented on FLINK-7999: I think [FLINK-8478|https://issues.apache.org/jira/browse/FLINK-8478] and [FLINK-5725|https://issues.apache.org/jira/browse/FLINK-5725] both can solve the problem. I will close this task. > Variable Join Window Boundaries > --- > > Key: FLINK-7999 > URL: https://issues.apache.org/jira/browse/FLINK-7999 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Seth Wiesman >Priority: Major > > Allow window joins with variable length based on row attributes. > Consider a two streams joined on an id, where one has start and end dates, it > would be useful to be able to join each row during is live durations. Today > this can be expressed in the datastream api using a CoProcessFunction. > left.id = right.id AND (left.time > right.start and left.time < right.end) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645927#comment-16645927 ] TisonKun commented on FLINK-10527: -- How about adding this issue as a subtask of FLINK-10392? > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645924#comment-16645924 ] ASF GitHub Bot commented on FLINK-10527: TisonKun commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-428811431 @yanghua thanks for raising this thread! I think `YARNHighAvailabilityITCase`, `YARNSessionCapacitySchedulerITCase#testClientStartup` and `YARNSessionCapacitySchedulerITCase#testTaskManagerFailure` are ignored by `assumeTrue`. You remove the `assumeTrue` statement and re-enable them. It looks like unintended. Maybe do some analysis to see if we can remove them as (covered by existing tests/not supported any more) or port them to FLIP-6 code base? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
TisonKun commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-428811431 @yanghua thanks for raising this thread! I think `YARNHighAvailabilityITCase`, `YARNSessionCapacitySchedulerITCase#testClientStartup` and `YARNSessionCapacitySchedulerITCase#testTaskManagerFailure` are ignored by `assumeTrue`. You remove the `assumeTrue` statement and re-enable them. It looks like unintended. Maybe do some analysis to see if we can remove them as (covered by existing tests/not supported any more) or port them to FLIP-6 code base? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10298) Batch Job Failover Strategy
[ https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10298: --- Labels: pull-request-available (was: ) > Batch Job Failover Strategy > --- > > Key: FLINK-10298 > URL: https://issues.apache.org/jira/browse/FLINK-10298 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The new failover strategy needs to consider handling failures according to > different failure types. It orchestrates all the logics we mentioned in this > [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit], > we can put the logic in onTaskFailure method of the FailoverStrategy > interface, with the logic inline: > {code:java} > public void onTaskFailure(Execution taskExecution, Throwable cause) { > //1. Get the throwable type > //2. If the type is NonrecoverableType fail the job > //3. If the type is PatritionDataMissingError, do revocation > //4. If the type is EnvironmentError, do check blacklist > //5. Other failure types are recoverable, but we need to remember the > count of the failure, > //6. if it exceeds the threshold, fail the job > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10298) Batch Job Failover Strategy
[ https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645920#comment-16645920 ] ASF GitHub Bot commented on FLINK-10298: isunjin opened a new pull request #6817: [FLINK-10298] [JobManager] Batch Job Failover Strategy URL: https://github.com/apache/flink/pull/6817 ## What is the purpose of the change The new failover strategy needs to consider handling failures according to different failure types. It orchestrates the batch job failover logic in onTaskFailure method of the FailoverStrategy interface, with the logic inline: public void onTaskFailure(Execution taskExecution, Throwable cause) { //1. Get the throwable type //2. If the type is NonRecoverableError fail the job //3. If the type is PatritionDataMissingError, do revocation //4. If the type is EnvironmentError, do check blacklist //5. Other failure types are recoverable, but we need to remember the count of the failure //6. if it exceeds the threshold, fail the job } ## Brief change log - *Add a configuration MAX_ATTEMPTS_EXECUTION_FAILURE_COUNT, the default value is 6 * - *Add a failure counter in the failed region* - ** ## Verifying this change This change added tests and can be verified as follows: - *Added Tests to validate job fail immediately and not restart if meet a NoRecoverableError* - *Added a test to validate job fail after MAX_ATTEMPTS_EXECUTION_FAILURE_COUNT times if meet a RecoverableError" - *Added BatchJobFailoverStrategy that inherited from RestartPipelinedRegionStrategy* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes ) - If yes, how is the feature documented? ([document](https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit#heading=h.to0clnrw60r)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job Failover Strategy > --- > > Key: FLINK-10298 > URL: https://issues.apache.org/jira/browse/FLINK-10298 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The new failover strategy needs to consider handling failures according to > different failure types. It orchestrates all the logics we mentioned in this > [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit], > we can put the logic in onTaskFailure method of the FailoverStrategy > interface, with the logic inline: > {code:java} > public void onTaskFailure(Execution taskExecution, Throwable cause) { > //1. Get the throwable type > //2. If the type is NonrecoverableType fail the job > //3. If the type is PatritionDataMissingError, do revocation > //4. If the type is EnvironmentError, do check blacklist > //5. Other failure types are recoverable, but we need to remember the > count of the failure, > //6. if it exceeds the threshold, fail the job > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin opened a new pull request #6817: [FLINK-10298] [JobManager] Batch Job Failover Strategy
isunjin opened a new pull request #6817: [FLINK-10298] [JobManager] Batch Job Failover Strategy URL: https://github.com/apache/flink/pull/6817 ## What is the purpose of the change The new failover strategy needs to consider handling failures according to different failure types. It orchestrates the batch job failover logic in onTaskFailure method of the FailoverStrategy interface, with the logic inline: public void onTaskFailure(Execution taskExecution, Throwable cause) { //1. Get the throwable type //2. If the type is NonRecoverableError fail the job //3. If the type is PatritionDataMissingError, do revocation //4. If the type is EnvironmentError, do check blacklist //5. Other failure types are recoverable, but we need to remember the count of the failure //6. if it exceeds the threshold, fail the job } ## Brief change log - *Add a configuration MAX_ATTEMPTS_EXECUTION_FAILURE_COUNT, the default value is 6 * - *Add a failure counter in the failed region* - ** ## Verifying this change This change added tests and can be verified as follows: - *Added Tests to validate job fail immediately and not restart if meet a NoRecoverableError* - *Added a test to validate job fail after MAX_ATTEMPTS_EXECUTION_FAILURE_COUNT times if meet a RecoverableError" - *Added BatchJobFailoverStrategy that inherited from RestartPipelinedRegionStrategy* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes ) - If yes, how is the feature documented? ([document](https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit#heading=h.to0clnrw60r)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-5584) Support Sliding-count row-window on streaming sql
[ https://issues.apache.org/jira/browse/FLINK-5584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-5584. -- Resolution: Duplicate > Support Sliding-count row-window on streaming sql > - > > Key: FLINK-5584 > URL: https://issues.apache.org/jira/browse/FLINK-5584 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: hongyuhong >Assignee: hongyuhong >Priority: Major > > Calcite has already support sliding-count row-window, the grammar look like: > select sum(amount) over (rows 10 preceding) from Order; > select sum(amount) over (partition by user rows 10 preceding) from Order; > And it will parse the sql as a LogicalWindow relnode, the logical Window > contains aggregate func info and window info, it's similar to Flink > LogicalWIndowAggregate, so we can add an convert rule to directly convert > LogicalWindow into DataStreamAggregate relnode, and if Calcite support more > grammar, we can extend the convert rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5584) Support Sliding-count row-window on streaming sql
[ https://issues.apache.org/jira/browse/FLINK-5584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645907#comment-16645907 ] Hequn Cheng commented on FLINK-5584: I think this has been solved by [FLINK-4557|https://issues.apache.org/jira/browse/FLINK-4557]. I will close this one. > Support Sliding-count row-window on streaming sql > - > > Key: FLINK-5584 > URL: https://issues.apache.org/jira/browse/FLINK-5584 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: hongyuhong >Assignee: hongyuhong >Priority: Major > > Calcite has already support sliding-count row-window, the grammar look like: > select sum(amount) over (rows 10 preceding) from Order; > select sum(amount) over (partition by user rows 10 preceding) from Order; > And it will parse the sql as a LogicalWindow relnode, the logical Window > contains aggregate func info and window info, it's similar to Flink > LogicalWIndowAggregate, so we can add an convert rule to directly convert > LogicalWindow into DataStreamAggregate relnode, and if Calcite support more > grammar, we can extend the convert rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6807: [FLINK-10292] Generate JobGraph in StandaloneJobClusterEntrypoint only once
yanghua commented on issue #6807: [FLINK-10292] Generate JobGraph in StandaloneJobClusterEntrypoint only once URL: https://github.com/apache/flink/pull/6807#issuecomment-428806504 @tillrohrmann This is very strange, why this PR change causes the yarn test to time out, and some applications cannot reach the RUNNING state. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once
[ https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645901#comment-16645901 ] ASF GitHub Bot commented on FLINK-10292: yanghua commented on issue #6807: [FLINK-10292] Generate JobGraph in StandaloneJobClusterEntrypoint only once URL: https://github.com/apache/flink/pull/6807#issuecomment-428806504 @tillrohrmann This is very strange, why this PR change causes the yarn test to time out, and some applications cannot reach the RUNNING state. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Generate JobGraph in StandaloneJobClusterEntrypoint only once > - > > Key: FLINK-10292 > URL: https://issues.apache.org/jira/browse/FLINK-10292 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} > from the given user code every time it starts/is restarted. This can be > problematic if the the {{JobGraph}} generation has side effects. Therefore, > it would be better to generate the {{JobGraph}} only once and store it in HA > storage instead from where to retrieve. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
yanghua opened a new pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816 ## What is the purpose of the change *This pull request cleanups constant isNewMode in YarnTestBase* ## Brief change log - *Cleanup constant isNewMode in YarnTestBase* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645890#comment-16645890 ] ASF GitHub Bot commented on FLINK-10527: yanghua opened a new pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816 ## What is the purpose of the change *This pull request cleanups constant isNewMode in YarnTestBase* ## Brief change log - *Cleanup constant isNewMode in YarnTestBase* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10527: --- Labels: pull-request-available (was: ) > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9200) SQL E2E test failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-9200. -- Resolution: Duplicate > SQL E2E test failed on travis > - > > Key: FLINK-9200 > URL: https://issues.apache.org/jira/browse/FLINK-9200 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Critical > > https://travis-ci.org/zentol/flink-ci/builds/367994823 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9200) SQL E2E test failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645879#comment-16645879 ] Hequn Cheng edited comment on FLINK-9200 at 10/11/18 3:02 AM: -- This is duplicated with [FLINK-10220|https://issues.apache.org/jira/browse/FLINK-10220]. I will close this one and take a look of FLINK-10220. was (Author: hequn8128): Do we still have this problem? Should I close it? > SQL E2E test failed on travis > - > > Key: FLINK-9200 > URL: https://issues.apache.org/jira/browse/FLINK-9200 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Critical > > https://travis-ci.org/zentol/flink-ci/builds/367994823 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10220) StreamSQL E2E test fails on travis
[ https://issues.apache.org/jira/browse/FLINK-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10220: --- Assignee: Hequn Cheng > StreamSQL E2E test fails on travis > -- > > Key: FLINK-10220 > URL: https://issues.apache.org/jira/browse/FLINK-10220 > Project: Flink > Issue Type: Bug > Components: Table API SQL, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Hequn Cheng >Priority: Critical > Fix For: 1.7.0 > > > https://travis-ci.org/zentol/flink-ci/jobs/420972344 > {code} > [FAIL] 'Streaming SQL end-to-end test' failed after 1 minutes and 49 seconds! > Test exited with exit code 0 but the logs contained errors, exceptions or > non-empty .out files > 2018-08-27 07:34:36,311 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- window: > (TumblingGroupWindow('w$, 'rowtime, 2.millis)), select: ($SUM0(correct) > AS correct, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (correct, w$start AS > rowtime) -> to: Row -> Map -> Sink: Unnamed (1/1) > (97d055e4661ff3361a504626257d406d) switched from RUNNING to FAILED. > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:65) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllWindowFunction.apply(IncrementalAggregateAllWindowFunction.scala:62) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllTimeWindowFunction.apply(IncrementalAggregateAllTimeWindowFunction.scala:65) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllTimeWindowFunction.apply(IncrementalAggregateAllTimeWindowFunction.scala:37) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:46) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > ... 7 more
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645881#comment-16645881 ] vinoyang commented on FLINK-10527: -- cc [~till.rohrmann] and [~Tison] > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9200) SQL E2E test failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645879#comment-16645879 ] Hequn Cheng commented on FLINK-9200: Do we still have this problem? Should I close it? > SQL E2E test failed on travis > - > > Key: FLINK-9200 > URL: https://issues.apache.org/jira/browse/FLINK-9200 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Critical > > https://travis-ci.org/zentol/flink-ci/builds/367994823 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
vinoyang created FLINK-10527: Summary: Cleanup constant isNewMode in YarnTestBase Key: FLINK-10527 URL: https://issues.apache.org/jira/browse/FLINK-10527 Project: Flink Issue Type: Bug Components: YARN Reporter: vinoyang Assignee: vinoyang This seems to be a residual problem with FLINK-10396. It is set to true in that PR. Currently it has three usage scenarios: 1. assert, caused an error {code:java} assumeTrue("The new mode does not start TMs upfront.", !isNewMode); {code} 2. if (!isNewMode) the logic in the block would not have invoked, the if block can be removed 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-5077) testStreamTableSink fails unstable
[ https://issues.apache.org/jira/browse/FLINK-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng closed FLINK-5077. -- Resolution: Won't Fix > testStreamTableSink fails unstable > -- > > Key: FLINK-5077 > URL: https://issues.apache.org/jira/browse/FLINK-5077 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.1.4 >Reporter: Boris Osipov >Priority: Major > > I've faced with several fails TableSinkITCase.testStreamTableSink test. > {code} > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.938 sec > <<< FAILURE! - in org.apache.flink.api.scala.stream.TableSinkITCase > testStreamTableSink(org.apache.flink.api.scala.stream.TableSinkITCase) Time > elapsed: 10.534 sec <<< FAILURE! > java.lang.AssertionError: Different number of lines in expected and obtained > result. expected:<8> but was:<4> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:316) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:302) > at > org.apache.flink.api.scala.stream.TableSinkITCase.testStreamTableSink(TableSinkITCase.scala:61) > {code} > I made small research. I added additional StreamITCase.StringSink > {code} > val results = input.toTable(tEnv, 'a, 'b, 'c) > .where('a < 5 || 'a > 17) > .select('c, 'b) > results.writeToSink(new CsvTableSink(path)) > results.toDataStream[Row] > .addSink(new StreamITCase.StringSink) > {code} > and logging. I've ran test several times and I got following resuts in log on > fail: > {noformat} > -- Actual CsvTableSink: > Comment#13,6 > Comment#14,6 > Comment#15,6 > Hello world, how are you?,3 > Hello world,2 > Hi,1 > -- Stream sink: > Comment#12,6 > Comment#13,6 > Comment#14,6 > Comment#15,6 > Hello world, how are you?,3 > Hello world,2 > Hello,2 > Hi,1 > -- Expected result: > Comment#12,6 > Comment#13,6 > Comment#14,6 > Comment#15,6 > Hello world, how are you?,3 > Hello world,2 > Hello,2 > Hi,1 > {noformat} > Looks like writing to cvs works wrong. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5077) testStreamTableSink fails unstable
[ https://issues.apache.org/jira/browse/FLINK-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645872#comment-16645872 ] Hequn Cheng commented on FLINK-5077: I think we can close this issue now. I will close it. > testStreamTableSink fails unstable > -- > > Key: FLINK-5077 > URL: https://issues.apache.org/jira/browse/FLINK-5077 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.1.4 >Reporter: Boris Osipov >Priority: Major > > I've faced with several fails TableSinkITCase.testStreamTableSink test. > {code} > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.938 sec > <<< FAILURE! - in org.apache.flink.api.scala.stream.TableSinkITCase > testStreamTableSink(org.apache.flink.api.scala.stream.TableSinkITCase) Time > elapsed: 10.534 sec <<< FAILURE! > java.lang.AssertionError: Different number of lines in expected and obtained > result. expected:<8> but was:<4> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:316) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:302) > at > org.apache.flink.api.scala.stream.TableSinkITCase.testStreamTableSink(TableSinkITCase.scala:61) > {code} > I made small research. I added additional StreamITCase.StringSink > {code} > val results = input.toTable(tEnv, 'a, 'b, 'c) > .where('a < 5 || 'a > 17) > .select('c, 'b) > results.writeToSink(new CsvTableSink(path)) > results.toDataStream[Row] > .addSink(new StreamITCase.StringSink) > {code} > and logging. I've ran test several times and I got following resuts in log on > fail: > {noformat} > -- Actual CsvTableSink: > Comment#13,6 > Comment#14,6 > Comment#15,6 > Hello world, how are you?,3 > Hello world,2 > Hi,1 > -- Stream sink: > Comment#12,6 > Comment#13,6 > Comment#14,6 > Comment#15,6 > Hello world, how are you?,3 > Hello world,2 > Hello,2 > Hi,1 > -- Expected result: > Comment#12,6 > Comment#13,6 > Comment#14,6 > Comment#15,6 > Hello world, how are you?,3 > Hello world,2 > Hello,2 > Hi,1 > {noformat} > Looks like writing to cvs works wrong. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645813#comment-16645813 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224288732 ## File path: docs/dev/table/common.md ## @@ -540,22 +536,17 @@ result.insertInto("CsvSinkTable"); // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) +// register the TableSink with a specific schema +val fieldNames: Array[String] = Array("a", "b", "c") +val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG) +tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink) + // compute a result Table using Table API operators and/or SQL queries val result: Table = ... // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") Review comment: move this logic to before "registerTableSink". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645811#comment-16645811 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224288252 ## File path: docs/dev/table/common.md ## @@ -540,22 +536,17 @@ result.insertInto("CsvSinkTable"); // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) +// register the TableSink with a specific schema Review comment: Add create `TableSink` instance logic . ``` // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645816#comment-16645816 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224289837 ## File path: docs/dev/table/streaming.md ## @@ -516,7 +516,7 @@ Table result = ... TableSink sink = ... Review comment: Add registerTableSink logic ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645814#comment-16645814 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224289531 ## File path: docs/dev/table/connect.md ## @@ -1108,8 +1127,15 @@ val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() .setParameterTypes(INT_TYPE_INFO) .build() +tableEnv.registerTableSink( + "jdbcOutputTable", + sink, + // specify table schema + Array[String]("id"), + Array[TypeInformation[_]](Types.INT)) + val table: Table = ??? -table.writeToSink(sink) +table.insertInto("jdbcTableSink") Review comment: `jdbcTableSink` -> `jdbcOutputTable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645812#comment-16645812 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224289223 ## File path: docs/dev/table/connect.md ## @@ -1047,30 +1047,42 @@ The sink only supports append-only streaming tables. It cannot be used to emit a {% highlight java %} -Table table = ... - -table.writeToSink( - new CsvTableSink( +CsvTableSink sink = new CsvTableSink( path, // output path "|", // optional: delimit files by '|' 1, // optional: write to a single file -WriteMode.OVERWRITE)); // optional: override existing files +WriteMode.OVERWRITE); // optional: override existing files + +tableEnv.registerTableSink( + "csvOutputTable", + sink, + // specify table schema + new String[]{"f0", "f1"}, + new TypeInformation[]{Types.STRING, Types.INT}); +Table table = ... +table.insertInto("csvOutputTable"); {% endhighlight %} {% highlight scala %} -val table: Table = ??? - -table.writeToSink( - new CsvTableSink( +val sink: CsvTableSink = new CsvTableSink( path, // output path fieldDelim = "|", // optional: delimit files by '|' numFiles = 1, // optional: write to a single file -writeMode = WriteMode.OVERWRITE)) // optional: override existing files +writeMode = WriteMode.OVERWRITE), // optional: override existing files Review comment: remove "," This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645815#comment-16645815 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224289776 ## File path: docs/dev/table/streaming.md ## @@ -540,7 +540,7 @@ val result: Table = ??? val sink: TableSink[Row] = ??? Review comment: Add `registerTableSink` logic ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224289531 ## File path: docs/dev/table/connect.md ## @@ -1108,8 +1127,15 @@ val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() .setParameterTypes(INT_TYPE_INFO) .build() +tableEnv.registerTableSink( + "jdbcOutputTable", + sink, + // specify table schema + Array[String]("id"), + Array[TypeInformation[_]](Types.INT)) + val table: Table = ??? -table.writeToSink(sink) +table.insertInto("jdbcTableSink") Review comment: `jdbcTableSink` -> `jdbcOutputTable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224288252 ## File path: docs/dev/table/common.md ## @@ -540,22 +536,17 @@ result.insertInto("CsvSinkTable"); // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) +// register the TableSink with a specific schema Review comment: Add create `TableSink` instance logic . ``` // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224288732 ## File path: docs/dev/table/common.md ## @@ -540,22 +536,17 @@ result.insertInto("CsvSinkTable"); // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) +// register the TableSink with a specific schema +val fieldNames: Array[String] = Array("a", "b", "c") +val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG) +tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink) + // compute a result Table using Table API operators and/or SQL queries val result: Table = ... // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") Review comment: move this logic to before "registerTableSink". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224289837 ## File path: docs/dev/table/streaming.md ## @@ -516,7 +516,7 @@ Table result = ... TableSink sink = ... Review comment: Add registerTableSink logic ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224289223 ## File path: docs/dev/table/connect.md ## @@ -1047,30 +1047,42 @@ The sink only supports append-only streaming tables. It cannot be used to emit a {% highlight java %} -Table table = ... - -table.writeToSink( - new CsvTableSink( +CsvTableSink sink = new CsvTableSink( path, // output path "|", // optional: delimit files by '|' 1, // optional: write to a single file -WriteMode.OVERWRITE)); // optional: override existing files +WriteMode.OVERWRITE); // optional: override existing files + +tableEnv.registerTableSink( + "csvOutputTable", + sink, + // specify table schema + new String[]{"f0", "f1"}, + new TypeInformation[]{Types.STRING, Types.INT}); +Table table = ... +table.insertInto("csvOutputTable"); {% endhighlight %} {% highlight scala %} -val table: Table = ??? - -table.writeToSink( - new CsvTableSink( +val sink: CsvTableSink = new CsvTableSink( path, // output path fieldDelim = "|", // optional: delimit files by '|' numFiles = 1, // optional: write to a single file -writeMode = WriteMode.OVERWRITE)) // optional: override existing files +writeMode = WriteMode.OVERWRITE), // optional: override existing files Review comment: remove "," This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224289776 ## File path: docs/dev/table/streaming.md ## @@ -540,7 +540,7 @@ val result: Table = ??? val sink: TableSink[Row] = ??? Review comment: Add `registerTableSink` logic ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645785#comment-16645785 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224282676 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -307,11 +308,16 @@ class AggregateITCase extends StreamingWithStateTestBase { data.+=((4, 3L, "C")) data.+=((5, 3L, "C")) +tEnv.registerTableSink( + "testSink", + new TestUpsertSink(Array("c"), false).configure( +Array[String]("c", "bMax"), Array[TypeInformation[_]](Types.STRING, Types.LONG))) + val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) .groupBy('c) -.select('c, 'b.max) +.select('c, 'b.max as 'bMax) Review comment: Do we have to add alias ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645781#comment-16645781 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224276866 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala ## @@ -34,11 +34,12 @@ class TableSinksValidationTest extends TableTestBase { val util = streamTestUtil() val t = util.addTable[(Int, Long, String)]("MyTable", 'id, 'num, 'text) +util.tableEnv.registerTableSink("testSink",new TestAppendSink) Review comment: Add a space This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645784#comment-16645784 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224281005 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -944,7 +956,12 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") + case _ => +tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) Review comment: Can we using "insertInto(tableName, this.tableEnv.queryConfig)", that share the validation logic ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645786#comment-16645786 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224281405 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -960,7 +977,12 @@ class Table( * @param conf The [[QueryConfig]] to use. */ def insertInto(tableName: String, conf: QueryConfig): Unit = { -tableEnv.insertInto(this, tableName, conf) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") Review comment: new ValidationException(..) -> ValidationException(...) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645789#comment-16645789 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224279958 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -891,7 +891,13 @@ class Table( * * @param sink The [[TableSink]] to which the [[Table]] is written. * @tparam T The data type that the [[TableSink]] expects. +* +* @deprecated Will be removed in a future release. Please register the TableSink and use +* [[insertInto()]]. Review comment: For the complete javadoc, we need to remove "[[", [[insertInto()]] -> insertInto() ![image](https://user-images.githubusercontent.com/22488084/46772885-efc94400-cd2c-11e8-81e2-9c488fa20785.png) ![image](https://user-images.githubusercontent.com/22488084/46772899-01aae700-cd2d-11e8-96f3-842f65058d31.png) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645783#comment-16645783 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224276816 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala ## @@ -35,10 +35,11 @@ class TableSinkValidationTest extends TableTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text) +tEnv.registerTableSink("testSink",new TestAppendSink) Review comment: Add a space This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645787#comment-16645787 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224283829 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ## @@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase { .assignAscendingTimestamps(_._1.toLong) .toTable(tEnv, 'id, 'num, 'text) +tEnv.registerTableSink( + "retractSink", + new TestRetractSink().configure( +Array[String]("len", "icnt", "nsum"), +Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG))) + t.select('id, 'num, 'text.charLength() as 'len) .groupBy('len) - .select('len, 'id.count, 'num.sum) - .writeToSink(new TestRetractSink) + .select('len, 'id.count as 'icnt, 'num.sum as 'nsum) Review comment: In order to show that the column name of the source table can be inconsistent with the column name of the sink table, I recommend removing the alias. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645782#comment-16645782 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224282913 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala ## @@ -26,6 +26,7 @@ import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, Strea import org.junit.Assert._ Review comment: Please remove useless [[TableException]] import at line 23. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645788#comment-16645788 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224276845 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala ## @@ -70,10 +72,11 @@ class TableSinkValidationTest extends TableTestBase { val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTableSink("testSink",new TestAppendSink) Review comment: Add a space This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224282676 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ## @@ -307,11 +308,16 @@ class AggregateITCase extends StreamingWithStateTestBase { data.+=((4, 3L, "C")) data.+=((5, 3L, "C")) +tEnv.registerTableSink( + "testSink", + new TestUpsertSink(Array("c"), false).configure( +Array[String]("c", "bMax"), Array[TypeInformation[_]](Types.STRING, Types.LONG))) + val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) .groupBy('c) -.select('c, 'b.max) +.select('c, 'b.max as 'bMax) Review comment: Do we have to add alias ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224282913 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala ## @@ -26,6 +26,7 @@ import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, Strea import org.junit.Assert._ Review comment: Please remove useless [[TableException]] import at line 23. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224281405 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -960,7 +977,12 @@ class Table( * @param conf The [[QueryConfig]] to use. */ def insertInto(tableName: String, conf: QueryConfig): Unit = { -tableEnv.insertInto(this, tableName, conf) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") Review comment: new ValidationException(..) -> ValidationException(...) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224279958 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -891,7 +891,13 @@ class Table( * * @param sink The [[TableSink]] to which the [[Table]] is written. * @tparam T The data type that the [[TableSink]] expects. +* +* @deprecated Will be removed in a future release. Please register the TableSink and use +* [[insertInto()]]. Review comment: For the complete javadoc, we need to remove "[[", [[insertInto()]] -> insertInto() ![image](https://user-images.githubusercontent.com/22488084/46772885-efc94400-cd2c-11e8-81e2-9c488fa20785.png) ![image](https://user-images.githubusercontent.com/22488084/46772899-01aae700-cd2d-11e8-96f3-842f65058d31.png) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224276816 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala ## @@ -35,10 +35,11 @@ class TableSinkValidationTest extends TableTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text) +tEnv.registerTableSink("testSink",new TestAppendSink) Review comment: Add a space This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224276866 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala ## @@ -34,11 +34,12 @@ class TableSinksValidationTest extends TableTestBase { val util = streamTestUtil() val t = util.addTable[(Int, Long, String)]("MyTable", 'id, 'num, 'text) +util.tableEnv.registerTableSink("testSink",new TestAppendSink) Review comment: Add a space This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224283829 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ## @@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase { .assignAscendingTimestamps(_._1.toLong) .toTable(tEnv, 'id, 'num, 'text) +tEnv.registerTableSink( + "retractSink", + new TestRetractSink().configure( +Array[String]("len", "icnt", "nsum"), +Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG))) + t.select('id, 'num, 'text.charLength() as 'len) .groupBy('len) - .select('len, 'id.count, 'num.sum) - .writeToSink(new TestRetractSink) + .select('len, 'id.count as 'icnt, 'num.sum as 'nsum) Review comment: In order to show that the column name of the source table can be inconsistent with the column name of the sink table, I recommend removing the alias. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224281005 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -944,7 +956,12 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") + case _ => +tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) Review comment: Can we using "insertInto(tableName, this.tableEnv.queryConfig)", that share the validation logic ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224276845 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala ## @@ -70,10 +72,11 @@ class TableSinkValidationTest extends TableTestBase { val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) +tEnv.registerTableSink("testSink",new TestAppendSink) Review comment: Add a space This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10526) Hadoop FileSystem not initialized properly on Yarn
[ https://issues.apache.org/jira/browse/FLINK-10526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yan Yan closed FLINK-10526. --- Resolution: Duplicate > Hadoop FileSystem not initialized properly on Yarn > -- > > Key: FLINK-10526 > URL: https://issues.apache.org/jira/browse/FLINK-10526 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Yan Yan >Priority: Blocker > > When we were bumping Flink from 1.3.2 to 1.4.0, we noticed the > HadoopFileSystem no longer reads flinkConfig. > This prevents users to pass in custom HDFS configuration (e.g. core-site.xml) > via _fs.hdfs.hadoopconf_. > Specially it is due to the cached _HadoopFsFactory_ is initialized with an > dummy Configuration > ([[code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387]|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387]), > which prevents future _flinkConfig_ getting passed in > ([[code|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84]|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84]). > We have verified by calling _FileSystem.initialize(flinkConfig)_ explicitly > in _YarnApplicationMasterRunner.run_ solve the issue. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10526) Hadoop FileSystem not initialized properly on Yarn
Yan Yan created FLINK-10526: --- Summary: Hadoop FileSystem not initialized properly on Yarn Key: FLINK-10526 URL: https://issues.apache.org/jira/browse/FLINK-10526 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.4.0 Reporter: Yan Yan When we were bumping Flink from 1.3.2 to 1.4.0, we noticed the HadoopFileSystem no longer reads flinkConfig. This prevents users to pass in custom HDFS configuration (e.g. core-site.xml) via _fs.hdfs.hadoopconf_. Specially it is due to the cached _HadoopFsFactory_ is initialized with an dummy Configuration ([[code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387]|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387]), which prevents future _flinkConfig_ getting passed in ([[code|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84]|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84]). We have verified by calling _FileSystem.initialize(flinkConfig)_ explicitly in _YarnApplicationMasterRunner.run_ solve the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645678#comment-16645678 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224259148 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + /** +* Read parquet files with given result field names and types. +* +* @param path The path of the file to read. +* @param fieldTypes field types of read result of fields +* @param fieldNames field names to read, which can be subset of the parquet schema +*/ + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames) { + super(path); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + // read whole parquet file as one file split + this.unsplittable = true;
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645670#comment-16645670 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224251082 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645674#comment-16645674 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224244074 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); Review comment: Flink has its own `org.apache.flink.util.Preconditions` class that could be used here This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645673#comment-16645673 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224262756 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645677#comment-16645677 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224265721 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645665#comment-16645665 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224259648 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; Review comment: remove the counters and keep track of the read position in the `ParquetRecordReader` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645664#comment-16645664 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224255261 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645666#comment-16645666 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224247519 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645667#comment-16645667 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224232330 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + /** +* Read parquet files with given result field names and types. +* +* @param path The path of the file to read. +* @param fieldTypes field types of read result of fields +* @param fieldNames field names to read, which can be subset of the parquet schema +*/ + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames) { + super(path); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + // read whole parquet file as one file split + this.unsplittable = true;
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645668#comment-16645668 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224265334 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645679#comment-16645679 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224264555 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645661#comment-16645661 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224252224 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645676#comment-16645676 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224262433 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645669#comment-16645669 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224246278 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { Review comment: a few more comments would be good in this class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645675#comment-16645675 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224249073 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; Review comment: Add comments for the fields This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645662#comment-16645662 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224250345 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645654#comment-16645654 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224229460 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements + CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + protected RowTypeInfo readType; + + protected boolean isStandard; + + protected final TypeInformation[] fieldTypes; + + protected final String[] fieldNames; + + protected transient ParquetRecordReader parquetRecordReader; + + protected transient long recordsReadSinceLastSync; + + protected long lastSyncedBlock = -1L; + + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames, boolean isStandard) { + super(path); + this.readType = new RowTypeInfo(fieldTypes, fieldNames); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + this.unsplittable = true; Review comment: Addressing this in a follow-up issue is fine. However, reading the footer of each file during split generation might not be a good idea since this is a single-threaded operation. We could also split the file in any way (e.g., HDFS blocks) and identify the row groups that intersect with the split (given that the meta data includes the offsets and lengths of the groups). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645657#comment-16645657 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224246563 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); Review comment: I don't quite follow how the different schemas are used and overwritten. Can you please add some comments? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > >
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645658#comment-16645658 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224255589 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645671#comment-16645671 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224248653 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + /** +* Read parquet files with given result field names and types. +* +* @param path The path of the file to read. +* @param fieldTypes field types of read result of fields +* @param fieldNames field names to read, which can be subset of the parquet schema +*/ + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames) { + super(path); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + // read whole parquet file as one file split + this.unsplittable = true;
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645660#comment-16645660 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224235315 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); Review comment: This uses the message type to define which fields to read but not the overall schema of the files. In my earlier comments I was thinking of something like this: ``` // define input format with full schema of files (in whatever Schema representation Parquet uses). ParquetRowInputFormat pif = new ParquetRowInputFormat(new Path(...), "name: String, age: Int, city: String, ..."); // select which fields to read. This is used to build the readSchema. pif.selectFields("city", "name"); ``` The idea is similar to DDL in a DBMS. The input format is defined with all fields and later is selected which fields to read. This should be useful, because the full Parquet schema should be usually available and when selecting the field to read, we would not need to
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645672#comment-16645672 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224252848 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { Review comment: This method is only called once. Can we inline the logic?
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645659#comment-16645659 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224259294 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + /** +* Read parquet files with given result field names and types. +* +* @param path The path of the file to read. +* @param fieldTypes field types of read result of fields +* @param fieldNames field names to read, which can be subset of the parquet schema +*/ + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames) { + super(path); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + // read whole parquet file as one file split + this.unsplittable = true;
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645663#comment-16645663 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224263227 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages =
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645655#comment-16645655 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224230977 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements + CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + protected RowTypeInfo readType; + + protected boolean isStandard; + + protected final TypeInformation[] fieldTypes; + + protected final String[] fieldNames; + + protected transient ParquetRecordReader parquetRecordReader; + + protected transient long recordsReadSinceLastSync; + + protected long lastSyncedBlock = -1L; + + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames, boolean isStandard) { + super(path); + this.readType = new RowTypeInfo(fieldTypes, fieldNames); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + this.unsplittable = true; + this.isStandard = isStandard; + } + + @Override + public Tuple2 getCurrentState() { + return new Tuple2<>(this.lastSyncedBlock, this.recordsReadSinceLastSync); + } + + @Override + public void open(FileInputSplit split) throws IOException { Review comment: Yes, I saw that. I'm just worried what happens if we open a new split and a checkpoint is triggered before the first row is read. The chance is small (and might not even exist at all if split opening and reading of the first record are done in the same synchronized block), but it is also a very simple change to guarantee correctness here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645656#comment-16645656 ] ASF GitHub Bot commented on FLINK-7243: --- fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224232221 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + /** +* Read parquet files with given result field names and types. +* +* @param path The path of the file to read. +* @param fieldTypes field types of read result of fields +* @param fieldNames field names to read, which can be subset of the parquet schema +*/ + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames) { + super(path); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + // read whole parquet file as one file split + this.unsplittable = true;
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224262756 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + totalCountLoadedSoFar += pages.getRowCount(); + currentBlock++; + } + } + + public
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224255261 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + totalCountLoadedSoFar += pages.getRowCount(); + currentBlock++; + } + } + + public
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224230977 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements + CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + protected RowTypeInfo readType; + + protected boolean isStandard; + + protected final TypeInformation[] fieldTypes; + + protected final String[] fieldNames; + + protected transient ParquetRecordReader parquetRecordReader; + + protected transient long recordsReadSinceLastSync; + + protected long lastSyncedBlock = -1L; + + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames, boolean isStandard) { + super(path); + this.readType = new RowTypeInfo(fieldTypes, fieldNames); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + this.unsplittable = true; + this.isStandard = isStandard; + } + + @Override + public Tuple2 getCurrentState() { + return new Tuple2<>(this.lastSyncedBlock, this.recordsReadSinceLastSync); + } + + @Override + public void open(FileInputSplit split) throws IOException { Review comment: Yes, I saw that. I'm just worried what happens if we open a new split and a checkpoint is triggered before the first row is read. The chance is small (and might not even exist at all if split opening and reading of the first record are done in the same synchronized block), but it is also a very simple change to guarantee correctness here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224255589 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + totalCountLoadedSoFar += pages.getRowCount(); + currentBlock++; + } + } + + public
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224259148 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + /** +* Read parquet files with given result field names and types. +* +* @param path The path of the file to read. +* @param fieldTypes field types of read result of fields +* @param fieldNames field names to read, which can be subset of the parquet schema +*/ + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames) { + super(path); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + // read whole parquet file as one file split + this.unsplittable = true; + } + + @Override + public Tuple2 getCurrentState() { + return new Tuple2<>(this.lastSyncedBlock, this.recordsReadSinceLastSync); + } + + @Override + public void open(FileInputSplit split)
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224252224 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + totalCountLoadedSoFar += pages.getRowCount(); + currentBlock++; + } + } + + public
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224249073 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; Review comment: Add comments for the fields This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224232330 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + /** +* Read parquet files with given result field names and types. +* +* @param path The path of the file to read. +* @param fieldTypes field types of read result of fields +* @param fieldNames field names to read, which can be subset of the parquet schema +*/ + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames) { + super(path); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + // read whole parquet file as one file split + this.unsplittable = true; + } + + @Override + public Tuple2 getCurrentState() { + return new Tuple2<>(this.lastSyncedBlock, this.recordsReadSinceLastSync); + } + + @Override + public void open(FileInputSplit split)
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224265721 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + totalCountLoadedSoFar += pages.getRowCount(); + currentBlock++; + } + } + + public
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224263227 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + totalCountLoadedSoFar += pages.getRowCount(); + currentBlock++; + } + } + + public
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224264555 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private T currentValue; + private long total; + private long current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean strictTypeChecking = true; + private long totalCountLoadedSoFar = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.readSupport = readSupport; + this.readSchema = readSchema; + this.filter = checkNotNull(filter, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.readSchema = readContext.getRequestedSchema(); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.total = reader.getRecordCount(); + reader.setRequestedSchema(readSchema); + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + totalCountLoadedSoFar += pages.getRowCount(); + currentBlock++; + } + } + + public
[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r224248653 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + private transient Counter recordConsumed; + + private final TypeInformation[] fieldTypes; + + private final String[] fieldNames; + + private boolean skipThisSplit = false; + + private transient ParquetRecordReader parquetRecordReader; + + private transient long recordsReadSinceLastSync; + + private long lastSyncedBlock = -1L; + + /** +* Read parquet files with given result parquet schema. +* +* @param path The path of the file to read. +* @param messageType schema of read result +*/ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + RowTypeInfo readType = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType); + this.fieldTypes = readType.getFieldTypes(); + this.fieldNames = readType.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + /** +* Read parquet files with given result field names and types. +* +* @param path The path of the file to read. +* @param fieldTypes field types of read result of fields +* @param fieldNames field names to read, which can be subset of the parquet schema +*/ + protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, String[] fieldNames) { + super(path); + this.fieldTypes = fieldTypes; + this.fieldNames = fieldNames; + // read whole parquet file as one file split + this.unsplittable = true; + } + + @Override + public Tuple2 getCurrentState() { + return new Tuple2<>(this.lastSyncedBlock, this.recordsReadSinceLastSync); + } + + @Override + public void open(FileInputSplit split)