[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645998#comment-16645998
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 
2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-428828451
 
 
   @yanghua According to the slf4j doc `Level` is only available since version 
`1.7.15` while Flink uses `1.7.7`: 
https://www.slf4j.org/api/org/slf4j/event/Level.html. Could you try updating to 
`1.7.15`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for Kafka 2.0.0
> -
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0

2018-10-10 Thread GitBox
aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 
2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-428828451
 
 
   @yanghua According to the slf4j doc `Level` is only available since version 
`1.7.15` while Flink uses `1.7.7`: 
https://www.slf4j.org/api/org/slf4j/event/Level.html. Could you try updating to 
`1.7.15`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-10527:
-
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-10392

> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645970#comment-16645970
 ] 

ASF GitHub Bot commented on FLINK-10527:


yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-428818907
 
 
   I try to remove these test methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread GitBox
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-428818907
 
 
   I try to remove these test methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645967#comment-16645967
 ] 

ASF GitHub Bot commented on FLINK-10527:


yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-428818387
 
 
   @TisonKun it seems it is not been ignored, it causes my PR failed, see 
[here](https://travis-ci.org/apache/flink/jobs/439612829).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread GitBox
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-428818387
 
 
   @TisonKun it seems it is not been ignored, it causes my PR failed, see 
[here](https://travis-ci.org/apache/flink/jobs/439612829).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-7999) Variable Join Window Boundaries

2018-10-10 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-7999.
--
Resolution: Won't Fix

> Variable Join Window Boundaries
> ---
>
> Key: FLINK-7999
> URL: https://issues.apache.org/jira/browse/FLINK-7999
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Seth Wiesman
>Priority: Major
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries

2018-10-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645931#comment-16645931
 ] 

Hequn Cheng commented on FLINK-7999:


I think [FLINK-8478|https://issues.apache.org/jira/browse/FLINK-8478] and 
[FLINK-5725|https://issues.apache.org/jira/browse/FLINK-5725] both can solve 
the problem. I will close this task.

> Variable Join Window Boundaries
> ---
>
> Key: FLINK-7999
> URL: https://issues.apache.org/jira/browse/FLINK-7999
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Seth Wiesman
>Priority: Major
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645927#comment-16645927
 ] 

TisonKun commented on FLINK-10527:
--

How about adding this issue as a subtask of FLINK-10392?

> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645924#comment-16645924
 ] 

ASF GitHub Bot commented on FLINK-10527:


TisonKun commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-428811431
 
 
   @yanghua thanks for raising this thread! I think 
`YARNHighAvailabilityITCase`, 
`YARNSessionCapacitySchedulerITCase#testClientStartup` and 
`YARNSessionCapacitySchedulerITCase#testTaskManagerFailure` are ignored by 
`assumeTrue`.
   
   You remove the `assumeTrue` statement and re-enable them. It looks like 
unintended. Maybe do some analysis to see if we can remove them as (covered by 
existing tests/not supported any more) or port them to FLIP-6 code base?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread GitBox
TisonKun commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-428811431
 
 
   @yanghua thanks for raising this thread! I think 
`YARNHighAvailabilityITCase`, 
`YARNSessionCapacitySchedulerITCase#testClientStartup` and 
`YARNSessionCapacitySchedulerITCase#testTaskManagerFailure` are ignored by 
`assumeTrue`.
   
   You remove the `assumeTrue` statement and re-enable them. It looks like 
unintended. Maybe do some analysis to see if we can remove them as (covered by 
existing tests/not supported any more) or port them to FLIP-6 code base?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10298) Batch Job Failover Strategy

2018-10-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10298:
---
Labels: pull-request-available  (was: )

> Batch Job Failover Strategy
> ---
>
> Key: FLINK-10298
> URL: https://issues.apache.org/jira/browse/FLINK-10298
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The new failover strategy needs to consider handling failures according to 
> different failure types. It orchestrates all the logics we mentioned in this 
> [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit],
>  we can put the logic in onTaskFailure method of the FailoverStrategy 
> interface, with the logic inline:
> {code:java}
> public void onTaskFailure(Execution taskExecution, Throwable cause) {  
>     //1. Get the throwable type
>     //2. If the type is NonrecoverableType fail the job
>     //3. If the type is PatritionDataMissingError, do revocation
>     //4. If the type is EnvironmentError, do check blacklist
> //5. Other failure types are recoverable, but we need to remember the 
> count of the failure,
> //6. if it exceeds the threshold, fail the job
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10298) Batch Job Failover Strategy

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645920#comment-16645920
 ] 

ASF GitHub Bot commented on FLINK-10298:


isunjin opened a new pull request #6817: [FLINK-10298] [JobManager] Batch Job 
Failover Strategy
URL: https://github.com/apache/flink/pull/6817
 
 
   ## What is the purpose of the change
   
   The new failover strategy needs to consider handling failures according to 
different failure types. It orchestrates the batch job failover logic in 
onTaskFailure method of the FailoverStrategy interface, with the logic inline:
   
   public void onTaskFailure(Execution taskExecution, Throwable cause) {
   
   //1. Get the throwable type
   
   //2. If the type is NonRecoverableError fail the job
   
   //3. If the type is PatritionDataMissingError, do revocation
   
   //4. If the type is EnvironmentError, do check blacklist
   
   //5. Other failure types are recoverable, but we need to remember 
the count
of the failure
   
   //6. if it exceeds the threshold, fail the job
   }
   
   ## Brief change log
 - *Add a configuration MAX_ATTEMPTS_EXECUTION_FAILURE_COUNT, the default 
value is 6 *
 - *Add a failure counter in the failed region*
 - **
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added Tests to validate job fail immediately and not restart if meet a 
NoRecoverableError*
 - *Added a test to validate job fail after 
MAX_ATTEMPTS_EXECUTION_FAILURE_COUNT times if meet a RecoverableError"
 - *Added BatchJobFailoverStrategy that inherited from 
RestartPipelinedRegionStrategy*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes )
 - If yes, how is the feature documented? 
([document](https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit#heading=h.to0clnrw60r))
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job Failover Strategy
> ---
>
> Key: FLINK-10298
> URL: https://issues.apache.org/jira/browse/FLINK-10298
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The new failover strategy needs to consider handling failures according to 
> different failure types. It orchestrates all the logics we mentioned in this 
> [document|https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit],
>  we can put the logic in onTaskFailure method of the FailoverStrategy 
> interface, with the logic inline:
> {code:java}
> public void onTaskFailure(Execution taskExecution, Throwable cause) {  
>     //1. Get the throwable type
>     //2. If the type is NonrecoverableType fail the job
>     //3. If the type is PatritionDataMissingError, do revocation
>     //4. If the type is EnvironmentError, do check blacklist
> //5. Other failure types are recoverable, but we need to remember the 
> count of the failure,
> //6. if it exceeds the threshold, fail the job
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin opened a new pull request #6817: [FLINK-10298] [JobManager] Batch Job Failover Strategy

2018-10-10 Thread GitBox
isunjin opened a new pull request #6817: [FLINK-10298] [JobManager] Batch Job 
Failover Strategy
URL: https://github.com/apache/flink/pull/6817
 
 
   ## What is the purpose of the change
   
   The new failover strategy needs to consider handling failures according to 
different failure types. It orchestrates the batch job failover logic in 
onTaskFailure method of the FailoverStrategy interface, with the logic inline:
   
   public void onTaskFailure(Execution taskExecution, Throwable cause) {
   
   //1. Get the throwable type
   
   //2. If the type is NonRecoverableError fail the job
   
   //3. If the type is PatritionDataMissingError, do revocation
   
   //4. If the type is EnvironmentError, do check blacklist
   
   //5. Other failure types are recoverable, but we need to remember 
the count
of the failure
   
   //6. if it exceeds the threshold, fail the job
   }
   
   ## Brief change log
 - *Add a configuration MAX_ATTEMPTS_EXECUTION_FAILURE_COUNT, the default 
value is 6 *
 - *Add a failure counter in the failed region*
 - **
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added Tests to validate job fail immediately and not restart if meet a 
NoRecoverableError*
 - *Added a test to validate job fail after 
MAX_ATTEMPTS_EXECUTION_FAILURE_COUNT times if meet a RecoverableError"
 - *Added BatchJobFailoverStrategy that inherited from 
RestartPipelinedRegionStrategy*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes )
 - If yes, how is the feature documented? 
([document](https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit#heading=h.to0clnrw60r))
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-5584) Support Sliding-count row-window on streaming sql

2018-10-10 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-5584.
--
Resolution: Duplicate

> Support Sliding-count row-window on streaming sql
> -
>
> Key: FLINK-5584
> URL: https://issues.apache.org/jira/browse/FLINK-5584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>Priority: Major
>
> Calcite has already support sliding-count row-window, the grammar look like:
> select sum(amount) over (rows 10 preceding) from Order;
> select sum(amount) over (partition by user rows 10 preceding) from Order;
> And it will parse the sql as a LogicalWindow relnode, the logical Window 
> contains aggregate func info and window info, it's similar to Flink 
> LogicalWIndowAggregate, so we can add an convert rule to directly convert 
> LogicalWindow into DataStreamAggregate relnode, and if Calcite support more 
> grammar, we can extend the convert rule.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5584) Support Sliding-count row-window on streaming sql

2018-10-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645907#comment-16645907
 ] 

Hequn Cheng commented on FLINK-5584:


I think this has been solved by 
[FLINK-4557|https://issues.apache.org/jira/browse/FLINK-4557]. I will close 
this one.

> Support Sliding-count row-window on streaming sql
> -
>
> Key: FLINK-5584
> URL: https://issues.apache.org/jira/browse/FLINK-5584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>Priority: Major
>
> Calcite has already support sliding-count row-window, the grammar look like:
> select sum(amount) over (rows 10 preceding) from Order;
> select sum(amount) over (partition by user rows 10 preceding) from Order;
> And it will parse the sql as a LogicalWindow relnode, the logical Window 
> contains aggregate func info and window info, it's similar to Flink 
> LogicalWIndowAggregate, so we can add an convert rule to directly convert 
> LogicalWindow into DataStreamAggregate relnode, and if Calcite support more 
> grammar, we can extend the convert rule.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6807: [FLINK-10292] Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-10-10 Thread GitBox
yanghua commented on issue #6807: [FLINK-10292] Generate JobGraph in 
StandaloneJobClusterEntrypoint only once
URL: https://github.com/apache/flink/pull/6807#issuecomment-428806504
 
 
   @tillrohrmann This is very strange, why this PR change causes the yarn test 
to time out, and some applications cannot reach the RUNNING state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645901#comment-16645901
 ] 

ASF GitHub Bot commented on FLINK-10292:


yanghua commented on issue #6807: [FLINK-10292] Generate JobGraph in 
StandaloneJobClusterEntrypoint only once
URL: https://github.com/apache/flink/pull/6807#issuecomment-428806504
 
 
   @tillrohrmann This is very strange, why this PR change causes the yarn test 
to time out, and some applications cannot reach the RUNNING state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread GitBox
yanghua opened a new pull request #6816: [FLINK-10527] Cleanup constant 
isNewMode in YarnTestBase
URL: https://github.com/apache/flink/pull/6816
 
 
   ## What is the purpose of the change
   
   *This pull request cleanups constant isNewMode in YarnTestBase*
   
   ## Brief change log
   
 - *Cleanup constant isNewMode in YarnTestBase*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645890#comment-16645890
 ] 

ASF GitHub Bot commented on FLINK-10527:


yanghua opened a new pull request #6816: [FLINK-10527] Cleanup constant 
isNewMode in YarnTestBase
URL: https://github.com/apache/flink/pull/6816
 
 
   ## What is the purpose of the change
   
   *This pull request cleanups constant isNewMode in YarnTestBase*
   
   ## Brief change log
   
 - *Cleanup constant isNewMode in YarnTestBase*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10527:
---
Labels: pull-request-available  (was: )

> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9200) SQL E2E test failed on travis

2018-10-10 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-9200.
--
Resolution: Duplicate

> SQL E2E test failed on travis
> -
>
> Key: FLINK-9200
> URL: https://issues.apache.org/jira/browse/FLINK-9200
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Critical
>
> https://travis-ci.org/zentol/flink-ci/builds/367994823



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9200) SQL E2E test failed on travis

2018-10-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645879#comment-16645879
 ] 

Hequn Cheng edited comment on FLINK-9200 at 10/11/18 3:02 AM:
--

This is duplicated with 
[FLINK-10220|https://issues.apache.org/jira/browse/FLINK-10220]. I will close 
this one and take a look of FLINK-10220.


was (Author: hequn8128):
Do we still have this problem? Should I close it? 

> SQL E2E test failed on travis
> -
>
> Key: FLINK-9200
> URL: https://issues.apache.org/jira/browse/FLINK-9200
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Critical
>
> https://travis-ci.org/zentol/flink-ci/builds/367994823



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10220) StreamSQL E2E test fails on travis

2018-10-10 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-10220:
---

Assignee: Hequn Cheng

> StreamSQL E2E test fails on travis
> --
>
> Key: FLINK-10220
> URL: https://issues.apache.org/jira/browse/FLINK-10220
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Hequn Cheng
>Priority: Critical
> Fix For: 1.7.0
>
>
> https://travis-ci.org/zentol/flink-ci/jobs/420972344
> {code}
> [FAIL] 'Streaming SQL end-to-end test' failed after 1 minutes and 49 seconds! 
> Test exited with exit code 0 but the logs contained errors, exceptions or 
> non-empty .out files
> 2018-08-27 07:34:36,311 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- window: 
> (TumblingGroupWindow('w$, 'rowtime, 2.millis)), select: ($SUM0(correct) 
> AS correct, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS 
> w$rowtime, proctime('w$) AS w$proctime) -> select: (correct, w$start AS 
> rowtime) -> to: Row -> Map -> Sink: Unnamed (1/1) 
> (97d055e4661ff3361a504626257d406d) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:65)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllWindowFunction.apply(IncrementalAggregateAllWindowFunction.scala:62)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllTimeWindowFunction.apply(IncrementalAggregateAllTimeWindowFunction.scala:65)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateAllTimeWindowFunction.apply(IncrementalAggregateAllTimeWindowFunction.scala:37)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:46)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction.process(InternalSingleValueAllWindowFunction.java:34)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>   ... 7 more

[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645881#comment-16645881
 ] 

vinoyang commented on FLINK-10527:
--

cc [~till.rohrmann] and [~Tison]

> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9200) SQL E2E test failed on travis

2018-10-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645879#comment-16645879
 ] 

Hequn Cheng commented on FLINK-9200:


Do we still have this problem? Should I close it? 

> SQL E2E test failed on travis
> -
>
> Key: FLINK-9200
> URL: https://issues.apache.org/jira/browse/FLINK-9200
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Critical
>
> https://travis-ci.org/zentol/flink-ci/builds/367994823



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-10 Thread vinoyang (JIRA)
vinoyang created FLINK-10527:


 Summary: Cleanup constant isNewMode in YarnTestBase
 Key: FLINK-10527
 URL: https://issues.apache.org/jira/browse/FLINK-10527
 Project: Flink
  Issue Type: Bug
  Components: YARN
Reporter: vinoyang
Assignee: vinoyang


This seems to be a residual problem with FLINK-10396. It is set to true in that 
PR. Currently it has three usage scenarios:

1. assert, caused an error
{code:java}
assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
{code}
2. if (!isNewMode) the logic in the block would not have invoked, the if block 
can be removed

3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-5077) testStreamTableSink fails unstable

2018-10-10 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-5077.
--
Resolution: Won't Fix

> testStreamTableSink fails unstable
> --
>
> Key: FLINK-5077
> URL: https://issues.apache.org/jira/browse/FLINK-5077
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.1.4
>Reporter: Boris Osipov
>Priority: Major
>
> I've faced with several fails TableSinkITCase.testStreamTableSink test.
> {code}
> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.938 sec 
> <<< FAILURE! - in org.apache.flink.api.scala.stream.TableSinkITCase
> testStreamTableSink(org.apache.flink.api.scala.stream.TableSinkITCase)  Time 
> elapsed: 10.534 sec  <<< FAILURE!
> java.lang.AssertionError: Different number of lines in expected and obtained 
> result. expected:<8> but was:<4>
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.failNotEquals(Assert.java:834)
> at org.junit.Assert.assertEquals(Assert.java:645)
> at 
> org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:316)
> at 
> org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:302)
> at 
> org.apache.flink.api.scala.stream.TableSinkITCase.testStreamTableSink(TableSinkITCase.scala:61)
> {code}
> I made small research. I added additional StreamITCase.StringSink
> {code}
> val results = input.toTable(tEnv, 'a, 'b, 'c)
>   .where('a < 5 || 'a > 17)
>   .select('c, 'b)
> results.writeToSink(new CsvTableSink(path))
> results.toDataStream[Row]
>   .addSink(new StreamITCase.StringSink)
> {code}
> and logging. I've ran test several times and I got following resuts in log on 
> fail:
> {noformat}
> -- Actual CsvTableSink:
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hi,1
> -- Stream sink:
> Comment#12,6
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hello,2
> Hi,1
> -- Expected result:
> Comment#12,6
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hello,2
> Hi,1
> {noformat}
> Looks like writing to cvs works wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5077) testStreamTableSink fails unstable

2018-10-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645872#comment-16645872
 ] 

Hequn Cheng commented on FLINK-5077:


I think we can close this issue now. I will close it.

> testStreamTableSink fails unstable
> --
>
> Key: FLINK-5077
> URL: https://issues.apache.org/jira/browse/FLINK-5077
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.1.4
>Reporter: Boris Osipov
>Priority: Major
>
> I've faced with several fails TableSinkITCase.testStreamTableSink test.
> {code}
> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 16.938 sec 
> <<< FAILURE! - in org.apache.flink.api.scala.stream.TableSinkITCase
> testStreamTableSink(org.apache.flink.api.scala.stream.TableSinkITCase)  Time 
> elapsed: 10.534 sec  <<< FAILURE!
> java.lang.AssertionError: Different number of lines in expected and obtained 
> result. expected:<8> but was:<4>
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.failNotEquals(Assert.java:834)
> at org.junit.Assert.assertEquals(Assert.java:645)
> at 
> org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:316)
> at 
> org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:302)
> at 
> org.apache.flink.api.scala.stream.TableSinkITCase.testStreamTableSink(TableSinkITCase.scala:61)
> {code}
> I made small research. I added additional StreamITCase.StringSink
> {code}
> val results = input.toTable(tEnv, 'a, 'b, 'c)
>   .where('a < 5 || 'a > 17)
>   .select('c, 'b)
> results.writeToSink(new CsvTableSink(path))
> results.toDataStream[Row]
>   .addSink(new StreamITCase.StringSink)
> {code}
> and logging. I've ran test several times and I got following resuts in log on 
> fail:
> {noformat}
> -- Actual CsvTableSink:
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hi,1
> -- Stream sink:
> Comment#12,6
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hello,2
> Hi,1
> -- Expected result:
> Comment#12,6
> Comment#13,6
> Comment#14,6
> Comment#15,6
> Hello world, how are you?,3
> Hello world,2
> Hello,2
> Hi,1
> {noformat}
> Looks like writing to cvs works wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645813#comment-16645813
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224288732
 
 

 ##
 File path: docs/dev/table/common.md
 ##
 @@ -540,22 +536,17 @@ result.insertInto("CsvSinkTable");
 // get a TableEnvironment
 val tableEnv = TableEnvironment.getTableEnvironment(env)
 
+// register the TableSink with a specific schema
+val fieldNames: Array[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, 
Types.LONG)
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
+
 // compute a result Table using Table API operators and/or SQL queries
 val result: Table = ...
 
 // create a TableSink
 val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
 
 Review comment:
   move this logic to before "registerTableSink".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645811#comment-16645811
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224288252
 
 

 ##
 File path: docs/dev/table/common.md
 ##
 @@ -540,22 +536,17 @@ result.insertInto("CsvSinkTable");
 // get a TableEnvironment
 val tableEnv = TableEnvironment.getTableEnvironment(env)
 
+// register the TableSink with a specific schema
 
 Review comment:
   Add create `TableSink` instance logic .
   ```
   // create a TableSink
   val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645816#comment-16645816
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224289837
 
 

 ##
 File path: docs/dev/table/streaming.md
 ##
 @@ -516,7 +516,7 @@ Table result = ...
 TableSink sink = ...
 
 
 Review comment:
   Add registerTableSink logic ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645814#comment-16645814
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224289531
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -1108,8 +1127,15 @@ val sink: JDBCAppendTableSink = 
JDBCAppendTableSink.builder()
   .setParameterTypes(INT_TYPE_INFO)
   .build()
 
+tableEnv.registerTableSink(
+  "jdbcOutputTable",
+  sink,
+  // specify table schema
+  Array[String]("id"),
+  Array[TypeInformation[_]](Types.INT))
+
 val table: Table = ???
-table.writeToSink(sink)
+table.insertInto("jdbcTableSink")
 
 Review comment:
   `jdbcTableSink` -> `jdbcOutputTable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645812#comment-16645812
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224289223
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -1047,30 +1047,42 @@ The sink only supports append-only streaming tables. 
It cannot be used to emit a
 
 {% highlight java %}
 
-Table table = ...
-
-table.writeToSink(
-  new CsvTableSink(
+CsvTableSink sink = new CsvTableSink(
 path,  // output path 
 "|",   // optional: delimit files by '|'
 1, // optional: write to a single file
-WriteMode.OVERWRITE)); // optional: override existing files
+WriteMode.OVERWRITE);  // optional: override existing files
+
+tableEnv.registerTableSink(
+  "csvOutputTable",
+  sink,
+  // specify table schema
+  new String[]{"f0", "f1"},
+  new TypeInformation[]{Types.STRING, Types.INT});
 
+Table table = ...
+table.insertInto("csvOutputTable");
 {% endhighlight %}
 
 
 
 {% highlight scala %}
 
-val table: Table = ???
-
-table.writeToSink(
-  new CsvTableSink(
+val sink: CsvTableSink = new CsvTableSink(
 path, // output path 
 fieldDelim = "|", // optional: delimit files by '|'
 numFiles = 1, // optional: write to a single file
-writeMode = WriteMode.OVERWRITE)) // optional: override existing files
+writeMode = WriteMode.OVERWRITE), // optional: override existing files
 
 Review comment:
   remove ","


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645815#comment-16645815
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224289776
 
 

 ##
 File path: docs/dev/table/streaming.md
 ##
 @@ -540,7 +540,7 @@ val result: Table = ???
 val sink: TableSink[Row] = ???
 
 
 Review comment:
   Add `registerTableSink` logic ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224289531
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -1108,8 +1127,15 @@ val sink: JDBCAppendTableSink = 
JDBCAppendTableSink.builder()
   .setParameterTypes(INT_TYPE_INFO)
   .build()
 
+tableEnv.registerTableSink(
+  "jdbcOutputTable",
+  sink,
+  // specify table schema
+  Array[String]("id"),
+  Array[TypeInformation[_]](Types.INT))
+
 val table: Table = ???
-table.writeToSink(sink)
+table.insertInto("jdbcTableSink")
 
 Review comment:
   `jdbcTableSink` -> `jdbcOutputTable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224288252
 
 

 ##
 File path: docs/dev/table/common.md
 ##
 @@ -540,22 +536,17 @@ result.insertInto("CsvSinkTable");
 // get a TableEnvironment
 val tableEnv = TableEnvironment.getTableEnvironment(env)
 
+// register the TableSink with a specific schema
 
 Review comment:
   Add create `TableSink` instance logic .
   ```
   // create a TableSink
   val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224288732
 
 

 ##
 File path: docs/dev/table/common.md
 ##
 @@ -540,22 +536,17 @@ result.insertInto("CsvSinkTable");
 // get a TableEnvironment
 val tableEnv = TableEnvironment.getTableEnvironment(env)
 
+// register the TableSink with a specific schema
+val fieldNames: Array[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, 
Types.LONG)
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
+
 // compute a result Table using Table API operators and/or SQL queries
 val result: Table = ...
 
 // create a TableSink
 val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
 
 Review comment:
   move this logic to before "registerTableSink".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224289837
 
 

 ##
 File path: docs/dev/table/streaming.md
 ##
 @@ -516,7 +516,7 @@ Table result = ...
 TableSink sink = ...
 
 
 Review comment:
   Add registerTableSink logic ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224289223
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -1047,30 +1047,42 @@ The sink only supports append-only streaming tables. 
It cannot be used to emit a
 
 {% highlight java %}
 
-Table table = ...
-
-table.writeToSink(
-  new CsvTableSink(
+CsvTableSink sink = new CsvTableSink(
 path,  // output path 
 "|",   // optional: delimit files by '|'
 1, // optional: write to a single file
-WriteMode.OVERWRITE)); // optional: override existing files
+WriteMode.OVERWRITE);  // optional: override existing files
+
+tableEnv.registerTableSink(
+  "csvOutputTable",
+  sink,
+  // specify table schema
+  new String[]{"f0", "f1"},
+  new TypeInformation[]{Types.STRING, Types.INT});
 
+Table table = ...
+table.insertInto("csvOutputTable");
 {% endhighlight %}
 
 
 
 {% highlight scala %}
 
-val table: Table = ???
-
-table.writeToSink(
-  new CsvTableSink(
+val sink: CsvTableSink = new CsvTableSink(
 path, // output path 
 fieldDelim = "|", // optional: delimit files by '|'
 numFiles = 1, // optional: write to a single file
-writeMode = WriteMode.OVERWRITE)) // optional: override existing files
+writeMode = WriteMode.OVERWRITE), // optional: override existing files
 
 Review comment:
   remove ","


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224289776
 
 

 ##
 File path: docs/dev/table/streaming.md
 ##
 @@ -540,7 +540,7 @@ val result: Table = ???
 val sink: TableSink[Row] = ???
 
 
 Review comment:
   Add `registerTableSink` logic ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645785#comment-16645785
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224282676
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
 ##
 @@ -307,11 +308,16 @@ class AggregateITCase extends StreamingWithStateTestBase 
{
 data.+=((4, 3L, "C"))
 data.+=((5, 3L, "C"))
 
+tEnv.registerTableSink(
+  "testSink",
+  new TestUpsertSink(Array("c"), false).configure(
+Array[String]("c", "bMax"), Array[TypeInformation[_]](Types.STRING, 
Types.LONG)))
+
 val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
 .groupBy('c)
-.select('c, 'b.max)
+.select('c, 'b.max as 'bMax)
 
 Review comment:
   Do we have to add alias ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645781#comment-16645781
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224276866
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
 ##
 @@ -34,11 +34,12 @@ class TableSinksValidationTest extends TableTestBase {
 val util = streamTestUtil()
 
 val t = util.addTable[(Int, Long, String)]("MyTable", 'id, 'num, 'text)
+util.tableEnv.registerTableSink("testSink",new TestAppendSink)
 
 Review comment:
   Add a space


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645784#comment-16645784
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224281005
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -944,7 +956,12 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
+  case _ =>
+tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
 
 Review comment:
   Can we using "insertInto(tableName, this.tableEnv.queryConfig)", that share 
the validation logic ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645786#comment-16645786
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224281405
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -960,7 +977,12 @@ class Table(
 * @param conf The [[QueryConfig]] to use.
 */
   def insertInto(tableName: String, conf: QueryConfig): Unit = {
-tableEnv.insertInto(this, tableName, conf)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
 
 Review comment:
   new ValidationException(..) ->  ValidationException(...)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645789#comment-16645789
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224279958
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -891,7 +891,13 @@ class Table(
 *
 * @param sink The [[TableSink]] to which the [[Table]] is written.
 * @tparam T The data type that the [[TableSink]] expects.
+*
+* @deprecated Will be removed in a future release. Please register the 
TableSink and use
+* [[insertInto()]].
 
 Review comment:
   For the complete javadoc, we need to remove "[[", 
   [[insertInto()]] -> insertInto()
   
![image](https://user-images.githubusercontent.com/22488084/46772885-efc94400-cd2c-11e8-81e2-9c488fa20785.png)
   
![image](https://user-images.githubusercontent.com/22488084/46772899-01aae700-cd2d-11e8-96f3-842f65058d31.png)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645783#comment-16645783
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224276816
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
 ##
 @@ -35,10 +35,11 @@ class TableSinkValidationTest extends TableTestBase {
 val tEnv = TableEnvironment.getTableEnvironment(env)
 
 val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 
'text)
+tEnv.registerTableSink("testSink",new TestAppendSink)
 
 Review comment:
   Add a space


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645787#comment-16645787
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224283829
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 ##
 @@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase {
   .assignAscendingTimestamps(_._1.toLong)
   .toTable(tEnv, 'id, 'num, 'text)
 
+tEnv.registerTableSink(
+  "retractSink",
+  new TestRetractSink().configure(
+Array[String]("len", "icnt", "nsum"),
+Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG)))
+
 t.select('id, 'num, 'text.charLength() as 'len)
   .groupBy('len)
-  .select('len, 'id.count, 'num.sum)
-  .writeToSink(new TestRetractSink)
+  .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
 
 Review comment:
   In order to show that the column name of the source table can be 
inconsistent with the column name of the sink table, I recommend removing the 
alias. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645782#comment-16645782
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224282913
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 ##
 @@ -26,6 +26,7 @@ import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamTestData, Strea
 import org.junit.Assert._
 
 Review comment:
   Please remove useless [[TableException]] import at line 23.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645788#comment-16645788
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224276845
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
 ##
 @@ -70,10 +72,11 @@ class TableSinkValidationTest extends TableTestBase {
 
 val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+tEnv.registerTableSink("testSink",new TestAppendSink)
 
 Review comment:
   Add a space


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224282676
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
 ##
 @@ -307,11 +308,16 @@ class AggregateITCase extends StreamingWithStateTestBase 
{
 data.+=((4, 3L, "C"))
 data.+=((5, 3L, "C"))
 
+tEnv.registerTableSink(
+  "testSink",
+  new TestUpsertSink(Array("c"), false).configure(
+Array[String]("c", "bMax"), Array[TypeInformation[_]](Types.STRING, 
Types.LONG)))
+
 val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
 .groupBy('c)
-.select('c, 'b.max)
+.select('c, 'b.max as 'bMax)
 
 Review comment:
   Do we have to add alias ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224282913
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 ##
 @@ -26,6 +26,7 @@ import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamTestData, Strea
 import org.junit.Assert._
 
 Review comment:
   Please remove useless [[TableException]] import at line 23.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224281405
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -960,7 +977,12 @@ class Table(
 * @param conf The [[QueryConfig]] to use.
 */
   def insertInto(tableName: String, conf: QueryConfig): Unit = {
-tableEnv.insertInto(this, tableName, conf)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
 
 Review comment:
   new ValidationException(..) ->  ValidationException(...)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224279958
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -891,7 +891,13 @@ class Table(
 *
 * @param sink The [[TableSink]] to which the [[Table]] is written.
 * @tparam T The data type that the [[TableSink]] expects.
+*
+* @deprecated Will be removed in a future release. Please register the 
TableSink and use
+* [[insertInto()]].
 
 Review comment:
   For the complete javadoc, we need to remove "[[", 
   [[insertInto()]] -> insertInto()
   
![image](https://user-images.githubusercontent.com/22488084/46772885-efc94400-cd2c-11e8-81e2-9c488fa20785.png)
   
![image](https://user-images.githubusercontent.com/22488084/46772899-01aae700-cd2d-11e8-96f3-842f65058d31.png)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224276816
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
 ##
 @@ -35,10 +35,11 @@ class TableSinkValidationTest extends TableTestBase {
 val tEnv = TableEnvironment.getTableEnvironment(env)
 
 val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 
'text)
+tEnv.registerTableSink("testSink",new TestAppendSink)
 
 Review comment:
   Add a space


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224276866
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
 ##
 @@ -34,11 +34,12 @@ class TableSinksValidationTest extends TableTestBase {
 val util = streamTestUtil()
 
 val t = util.addTable[(Int, Long, String)]("MyTable", 'id, 'num, 'text)
+util.tableEnv.registerTableSink("testSink",new TestAppendSink)
 
 Review comment:
   Add a space


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224283829
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 ##
 @@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase {
   .assignAscendingTimestamps(_._1.toLong)
   .toTable(tEnv, 'id, 'num, 'text)
 
+tEnv.registerTableSink(
+  "retractSink",
+  new TestRetractSink().configure(
+Array[String]("len", "icnt", "nsum"),
+Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG)))
+
 t.select('id, 'num, 'text.charLength() as 'len)
   .groupBy('len)
-  .select('len, 'id.count, 'num.sum)
-  .writeToSink(new TestRetractSink)
+  .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
 
 Review comment:
   In order to show that the column name of the source table can be 
inconsistent with the column name of the sink table, I recommend removing the 
alias. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224281005
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -944,7 +956,12 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
+  case _ =>
+tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
 
 Review comment:
   Can we using "insertInto(tableName, this.tableEnv.queryConfig)", that share 
the validation logic ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-10 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224276845
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSinkValidationTest.scala
 ##
 @@ -70,10 +72,11 @@ class TableSinkValidationTest extends TableTestBase {
 
 val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
 val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
+tEnv.registerTableSink("testSink",new TestAppendSink)
 
 Review comment:
   Add a space


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10526) Hadoop FileSystem not initialized properly on Yarn

2018-10-10 Thread Yan Yan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yan Yan closed FLINK-10526.
---
Resolution: Duplicate

> Hadoop FileSystem not initialized properly on Yarn
> --
>
> Key: FLINK-10526
> URL: https://issues.apache.org/jira/browse/FLINK-10526
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Yan Yan
>Priority: Blocker
>
> When we were bumping Flink from 1.3.2 to 1.4.0, we noticed the 
> HadoopFileSystem no longer reads flinkConfig. 
> This prevents users to pass in custom HDFS configuration (e.g. core-site.xml) 
> via _fs.hdfs.hadoopconf_.
> Specially it is due to the cached _HadoopFsFactory_ is initialized with an 
> dummy Configuration 
> ([[code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387]|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387]),
>  which prevents future _flinkConfig_ getting passed in 
> ([[code|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84]|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84]).
> We have verified by calling _FileSystem.initialize(flinkConfig)_ explicitly 
> in _YarnApplicationMasterRunner.run_ solve the issue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10526) Hadoop FileSystem not initialized properly on Yarn

2018-10-10 Thread Yan Yan (JIRA)
Yan Yan created FLINK-10526:
---

 Summary: Hadoop FileSystem not initialized properly on Yarn
 Key: FLINK-10526
 URL: https://issues.apache.org/jira/browse/FLINK-10526
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.4.0
Reporter: Yan Yan


When we were bumping Flink from 1.3.2 to 1.4.0, we noticed the HadoopFileSystem 
no longer reads flinkConfig. 

This prevents users to pass in custom HDFS configuration (e.g. core-site.xml) 
via _fs.hdfs.hadoopconf_.

Specially it is due to the cached _HadoopFsFactory_ is initialized with an 
dummy Configuration 
([[code|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387]|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L387]),
 which prevents future _flinkConfig_ getting passed in 
([[code|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84]|https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactory.java#L84]).

We have verified by calling _FileSystem.initialize(flinkConfig)_ explicitly in 
_YarnApplicationMasterRunner.run_ solve the issue.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645678#comment-16645678
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224259148
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645670#comment-16645670
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224251082
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645674#comment-16645674
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224244074
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
 
 Review comment:
   Flink has its own `org.apache.flink.util.Preconditions` class that could be 
used here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645673#comment-16645673
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224262756
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645677#comment-16645677
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224265721
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645665#comment-16645665
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224259648
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
 
 Review comment:
   remove the counters and keep track of the read position in the 
`ParquetRecordReader`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645664#comment-16645664
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224255261
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645666#comment-16645666
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224247519
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645667#comment-16645667
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224232330
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645668#comment-16645668
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224265334
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645679#comment-16645679
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224264555
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645661#comment-16645661
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224252224
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645676#comment-16645676
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224262433
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645669#comment-16645669
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224246278
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
 
 Review comment:
   a few more comments would be good in this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645675#comment-16645675
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224249073
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
 
 Review comment:
   Add comments for the fields


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645662#comment-16645662
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224250345
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645654#comment-16645654
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224229460
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
 
 Review comment:
   Addressing this in a follow-up issue is fine. However, reading the footer of 
each file during split generation might not be a good idea since this is a 
single-threaded operation. We could also split the file in any way (e.g., HDFS 
blocks) and identify the row groups that intersect with the split (given that 
the meta data includes the offsets and lengths of the groups).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645657#comment-16645657
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224246563
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
 
 Review comment:
   I don't quite follow how the different schemas are used and overwritten. Can 
you please add some comments?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645658#comment-16645658
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224255589
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645671#comment-16645671
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224248653
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645660#comment-16645660
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224235315
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
 
 Review comment:
   This uses the message type to define which fields to read but not the 
overall schema of the files. In my earlier comments I was thinking of something 
like this:
   
   ```
   // define input format with full schema of files (in whatever Schema 
representation Parquet uses).
   ParquetRowInputFormat pif = new ParquetRowInputFormat(new Path(...), "name: 
String, age: Int, city: String, ..."); 
   // select which fields to read. This is used to build the readSchema.
   pif.selectFields("city", "name");
   ```
   
   The idea is similar to DDL in a DBMS. The input format is defined with all 
fields and later is selected which fields to read. This should be useful, 
because the full Parquet schema should be usually available and when selecting 
the field to read, we would not need to 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645672#comment-16645672
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224252848
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
 
 Review comment:
   This method is only called once. Can we inline the logic?


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645659#comment-16645659
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224259294
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645663#comment-16645663
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224263227
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645655#comment-16645655
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224230977
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
 
 Review comment:
   Yes, I saw that. I'm just worried what happens if we open a new split and a 
checkpoint is triggered before the first row is read. The chance is small (and 
might not even exist at all if split opening and reading of the first record 
are done in the same synchronized block), but it is also a very simple change 
to guarantee correctness here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go 

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645656#comment-16645656
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224232221
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224262756
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   public 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224255261
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   public 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224230977
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
+   CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   protected RowTypeInfo readType;
+
+   protected boolean isStandard;
+
+   protected final TypeInformation[] fieldTypes;
+
+   protected final String[] fieldNames;
+
+   protected transient ParquetRecordReader parquetRecordReader;
+
+   protected transient long recordsReadSinceLastSync;
+
+   protected long lastSyncedBlock = -1L;
+
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames, boolean isStandard) {
+   super(path);
+   this.readType = new RowTypeInfo(fieldTypes, fieldNames);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   this.unsplittable = true;
+   this.isStandard = isStandard;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
 
 Review comment:
   Yes, I saw that. I'm just worried what happens if we open a new split and a 
checkpoint is triggered before the first row is read. The chance is small (and 
might not even exist at all if split opening and reading of the first record 
are done in the same synchronized block), but it is also a very simple change 
to guarantee correctness here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224255589
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   public 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224259148
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224252224
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   public 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224249073
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
 
 Review comment:
   Add comments for the fields


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224232330
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224265721
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   public 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224263227
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   public 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224264555
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import 
org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that 
support start read from particular position.
+ */
+public class ParquetRecordReader {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReader.class);
+
+   private ColumnIOFactory columnIOFactory;
+   private Filter filter;
+
+   private MessageType readSchema;
+   private MessageType fileSchema;
+   private ReadSupport readSupport;
+
+   private RecordMaterializer recordMaterializer;
+   private T currentValue;
+   private long total;
+   private long current = 0;
+   private int currentBlock = -1;
+   private ParquetFileReader reader;
+   private RecordReader recordReader;
+   private boolean strictTypeChecking = true;
+   private long totalCountLoadedSoFar = 0;
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema, Filter filter) {
+   this.readSupport = readSupport;
+   this.readSchema = readSchema;
+   this.filter = checkNotNull(filter, "filter");
+   }
+
+   public ParquetRecordReader(ReadSupport readSupport, MessageType 
readSchema) {
+   this(readSupport, readSchema, FilterCompat.NOOP);
+   }
+
+   public void initialize(ParquetFileReader reader, Configuration 
configuration) {
+   this.reader = reader;
+   FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
+   this.fileSchema = parquetFileMetadata.getSchema();
+   Map fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
+   ReadSupport.ReadContext readContext = readSupport.init(new 
InitContext(
+   configuration, toSetMultiMap(fileMetadata), 
readSchema));
+
+   this.columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+   this.readSchema = readContext.getRequestedSchema();
+   this.recordMaterializer = readSupport.prepareForRead(
+   configuration, fileMetadata, readSchema, readContext);
+   this.total = reader.getRecordCount();
+   reader.setRequestedSchema(readSchema);
+   }
+
+   private void checkRead() throws IOException {
+   if (current == totalCountLoadedSoFar) {
+   PageReadStore pages = reader.readNextRowGroup();
+   recordReader = createRecordReader(pages);
+   totalCountLoadedSoFar += pages.getRowCount();
+   currentBlock++;
+   }
+   }
+
+   public 

[GitHub] fhueske commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-10-10 Thread GitBox
fhueske commented on a change in pull request #6483: 
[FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r224248653
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat
+   extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetInputFormat.class);
+
+   private transient Counter recordConsumed;
+
+   private final TypeInformation[] fieldTypes;
+
+   private final String[] fieldNames;
+
+   private boolean skipThisSplit = false;
+
+   private transient ParquetRecordReader parquetRecordReader;
+
+   private transient long recordsReadSinceLastSync;
+
+   private long lastSyncedBlock = -1L;
+
+   /**
+* Read parquet files with given result parquet schema.
+*
+* @param path The path of the file to read.
+* @param messageType schema of read result
+*/
+
+   protected ParquetInputFormat(Path path, MessageType messageType) {
+   super(path);
+   RowTypeInfo readType = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(messageType);
+   this.fieldTypes = readType.getFieldTypes();
+   this.fieldNames = readType.getFieldNames();
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   /**
+* Read parquet files with given result field names and types.
+*
+* @param path The path of the file to read.
+* @param fieldTypes field types of read result of fields
+* @param fieldNames field names to read, which can be subset of the 
parquet schema
+*/
+   protected ParquetInputFormat(Path path, TypeInformation[] fieldTypes, 
String[] fieldNames) {
+   super(path);
+   this.fieldTypes = fieldTypes;
+   this.fieldNames = fieldNames;
+   // read whole parquet file as one file split
+   this.unsplittable = true;
+   }
+
+   @Override
+   public Tuple2 getCurrentState() {
+   return new Tuple2<>(this.lastSyncedBlock, 
this.recordsReadSinceLastSync);
+   }
+
+   @Override
+   public void open(FileInputSplit split) 

  1   2   3   4   >