[jira] [Commented] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread
[ https://issues.apache.org/jira/browse/FLINK-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626789#comment-16626789 ] tison commented on FLINK-10413: --- This might be a duplicate of FLINK-10319 > requestPartitionState messages overwhelms JM RPC main thread > > > Key: FLINK-10413 > URL: https://issues.apache.org/jira/browse/FLINK-10413 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Assignee: vinoyang >Priority: Major > > We tried to benchmark the job scheduling performance with a 2000x2000 > ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks > finishes soon after started. > In this case we see slow RPC responses and TM/RM heartbeats to JM will > finally timeout. > We find ~2,000,000 requestPartitionState messages triggered by > triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC > main thread. This is due to downstream tasks can be started earlier than > upstream tasks in EAGER scheduling. > > We's suggest no partition producer state check to avoid this issue. The task > can just keep waiting for a while and retrying if the partition does not > exist. There are two cases when the partition does not exist: > # the partition is not started yet > # the partition is failed > In case 1, retry works. In case 2, a task failover will soon happen and > cancel the downstream tasks as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread
[ https://issues.apache.org/jira/browse/FLINK-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10413: Assignee: vinoyang > requestPartitionState messages overwhelms JM RPC main thread > > > Key: FLINK-10413 > URL: https://issues.apache.org/jira/browse/FLINK-10413 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Assignee: vinoyang >Priority: Major > > We tried to benchmark the job scheduling performance with a 2000x2000 > ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks > finishes soon after started. > In this case we see slow RPC responses and TM/RM heartbeats to JM will > finally timeout. > We find ~2,000,000 requestPartitionState messages triggered by > triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC > main thread. This is due to downstream tasks can be started earlier than > upstream tasks in EAGER scheduling. > > We's suggest no partition producer state check to avoid this issue. The task > can just keep waiting for a while and retrying if the partition does not > exist. There are two cases when the partition does not exist: > # the partition is not started yet > # the partition is failed > In case 1, retry works. In case 2, a task failover will soon happen and > cancel the downstream tasks as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626767#comment-16626767 ] ASF GitHub Bot commented on FLINK-10412: yanghua opened a new pull request #6755: [FLINK-10412] toString field in AbstractID should be transient to avoid been serialized URL: https://github.com/apache/flink/pull/6755 ## What is the purpose of the change *This pull request mark toString field in AbstractID as transient to avoid been serialized * ## Brief change log - *Mark toString field in AbstractID as transient to avoid been serialized* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Assignee: vinoyang >Priority: Major > Labels: deploy,deployment, pull-request-available, serialization > > The toString field in AbstractID will be serialized currently, which makes > RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo > larger (50%+). > It adds more pressure to JM memory especially in large scale job scheduling > (1x1 ALL-to-ALL connection). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10412: --- Labels: deploy,deployment pull-request-available serialization (was: deploy,deployment serialization) > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Assignee: vinoyang >Priority: Major > Labels: deploy,deployment, pull-request-available, serialization > > The toString field in AbstractID will be serialized currently, which makes > RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo > larger (50%+). > It adds more pressure to JM memory especially in large scale job scheduling > (1x1 ALL-to-ALL connection). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #6755: [FLINK-10412] toString field in AbstractID should be transient to avoid been serialized
yanghua opened a new pull request #6755: [FLINK-10412] toString field in AbstractID should be transient to avoid been serialized URL: https://github.com/apache/flink/pull/6755 ## What is the purpose of the change *This pull request mark toString field in AbstractID as transient to avoid been serialized * ## Brief change log - *Mark toString field in AbstractID as transient to avoid been serialized* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM
[ https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626745#comment-16626745 ] ASF GitHub Bot commented on FLINK-10319: TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-421222747 cc @tillrohrmann @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Too many requestPartitionState would crash JM > - > > Key: FLINK-10319 > URL: https://issues.apache.org/jira/browse/FLINK-10319 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Do not requestPartitionState from JM on partition request fail, which may > generate too many RPC requests and block JM. > We gain little benefit to check what state producer is in, which in the other > hand crash JM by too many RPC requests. Task could always > retriggerPartitionRequest from its InputGate, it would be fail if the > producer has gone and succeed if the producer alive. Anyway, no need to ask > for JM for help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM
TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-421222747 cc @tillrohrmann @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM
[ https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626724#comment-16626724 ] ASF GitHub Bot commented on FLINK-10319: TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-422276930 cc @tillrohrmann @GJL @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Too many requestPartitionState would crash JM > - > > Key: FLINK-10319 > URL: https://issues.apache.org/jira/browse/FLINK-10319 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Do not requestPartitionState from JM on partition request fail, which may > generate too many RPC requests and block JM. > We gain little benefit to check what state producer is in, which in the other > hand crash JM by too many RPC requests. Task could always > retriggerPartitionRequest from its InputGate, it would be fail if the > producer has gone and succeed if the producer alive. Anyway, no need to ask > for JM for help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM
[ https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626721#comment-16626721 ] ASF GitHub Bot commented on FLINK-10319: TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-421882434 cc @StephanEwen @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Too many requestPartitionState would crash JM > - > > Key: FLINK-10319 > URL: https://issues.apache.org/jira/browse/FLINK-10319 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Do not requestPartitionState from JM on partition request fail, which may > generate too many RPC requests and block JM. > We gain little benefit to check what state producer is in, which in the other > hand crash JM by too many RPC requests. Task could always > retriggerPartitionRequest from its InputGate, it would be fail if the > producer has gone and succeed if the producer alive. Anyway, no need to ask > for JM for help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM
TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-422276930 cc @tillrohrmann @GJL @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM
TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM URL: https://github.com/apache/flink/pull/6680#issuecomment-421882434 cc @StephanEwen @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread
[ https://issues.apache.org/jira/browse/FLINK-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10413: Description: We tried to benchmark the job scheduling performance with a 2000x2000 ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks finishes soon after started. In this case we see slow RPC responses and TM/RM heartbeats to JM will finally timeout. We find ~2,000,000 requestPartitionState messages triggered by triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC main thread. This is due to downstream tasks can be started earlier than upstream tasks in EAGER scheduling. We's suggest no partition producer state check to avoid this issue. The task can just keep waiting for a while and retrying if the partition does not exist. There are two cases when the partition does not exist: # the partition is not started yet # the partition is failed In case 1, retry works. In case 2, a task failover will soon happen and cancel the downstream tasks as well. was: We tried to benchmark the job scheduling performance with a 2000x2000 ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks finishes soon after started. In this case we see slow RPC responses and TM/RM heartbeats to JM will finally timeout. We find ~2,000,000 requestPartitionState messages triggered by triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC main thread. This is due to downstream tasks can be started earlier than upstream tasks in EAGER scheduling. We's suggest no partition producer state check to avoid this issue. The task can just keep waiting for a while and retrying if the partition does not exist. There are two cases when the partition does not exist: # the partition is not started yet # the partition is failed In case 1, retry works. In case 2, a task failover will soon happen and cancel the downstream tasks as well. > requestPartitionState messages overwhelms JM RPC main thread > > > Key: FLINK-10413 > URL: https://issues.apache.org/jira/browse/FLINK-10413 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Priority: Major > > We tried to benchmark the job scheduling performance with a 2000x2000 > ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks > finishes soon after started. > In this case we see slow RPC responses and TM/RM heartbeats to JM will > finally timeout. > We find ~2,000,000 requestPartitionState messages triggered by > triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC > main thread. This is due to downstream tasks can be started earlier than > upstream tasks in EAGER scheduling. > > We's suggest no partition producer state check to avoid this issue. The task > can just keep waiting for a while and retrying if the partition does not > exist. There are two cases when the partition does not exist: > # the partition is not started yet > # the partition is failed > In case 1, retry works. In case 2, a task failover will soon happen and > cancel the downstream tasks as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread
Zhu Zhu created FLINK-10413: --- Summary: requestPartitionState messages overwhelms JM RPC main thread Key: FLINK-10413 URL: https://issues.apache.org/jira/browse/FLINK-10413 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.7.0 Reporter: Zhu Zhu We tried to benchmark the job scheduling performance with a 2000x2000 ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks finishes soon after started. In this case we see slow RPC responses and TM/RM heartbeats to JM will finally timeout. We find ~2,000,000 requestPartitionState messages triggered by triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC main thread. This is due to downstream tasks can be started earlier than upstream tasks in EAGER scheduling. We's suggest no partition producer state check to avoid this issue. The task can just keep waiting for a while and retrying if the partition does not exist. There are two cases when the partition does not exist: # the partition is not started yet # the partition is failed In case 1, retry works. In case 2, a task failover will soon happen and cancel the downstream tasks as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626712#comment-16626712 ] vinoyang commented on FLINK-10412: -- [~zhuzh] Good catch! Will fix it soon. > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Assignee: vinoyang >Priority: Major > Labels: deploy,deployment, serialization > > The toString field in AbstractID will be serialized currently, which makes > RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo > larger (50%+). > It adds more pressure to JM memory especially in large scale job scheduling > (1x1 ALL-to-ALL connection). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10412: Assignee: vinoyang > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Assignee: vinoyang >Priority: Major > Labels: deploy,deployment, serialization > > The toString field in AbstractID will be serialized currently, which makes > RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo > larger (50%+). > It adds more pressure to JM memory especially in large scale job scheduling > (1x1 ALL-to-ALL connection). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10412: Description: The toString field in AbstractID will be serialized currently, which makes RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo larger (50%+). It adds more pressure to JM memory especially in large scale job scheduling (1x1 ALL-to-ALL connection). was:The toString field in A > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Priority: Major > Labels: deploy,deployment, serialization > > The toString field in AbstractID will be serialized currently, which makes > RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo > larger (50%+). > It adds more pressure to JM memory especially in large scale job scheduling > (1x1 ALL-to-ALL connection). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10412: Description: The toString field in A > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Priority: Major > Labels: deploy,deployment, serialization > > The toString field in A -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10412: Labels: deploy,deployment serialization (was: ) > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Priority: Major > Labels: deploy,deployment, serialization > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10412: Component/s: Distributed Coordination > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
[ https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-10412: Affects Version/s: 1.7.0 > toString field in AbstractID should be transient to avoid been serialized > - > > Key: FLINK-10412 > URL: https://issues.apache.org/jira/browse/FLINK-10412 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized
Zhu Zhu created FLINK-10412: --- Summary: toString field in AbstractID should be transient to avoid been serialized Key: FLINK-10412 URL: https://issues.apache.org/jira/browse/FLINK-10412 Project: Flink Issue Type: Bug Reporter: Zhu Zhu -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626693#comment-16626693 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars edited a comment on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954 > I think we need to revisit the way this fix works. At this point, it probably makes sense to close the PR and go back to a design discussion first. I would suggest to have that design discussion on the JIRA issue first. > > (1) Where should the BOM be read? Beginning of the file only? Then it should most likely be part of split generation. > > (2) Any design cannot have additional logic in the method that is called per record, otherwise it will have too much performance impact. @StephanEwen Thank you for your suggestion. I close the PR first. Then discuss how to fix this bug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] XuQianJin-Stars edited a comment on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed
XuQianJin-Stars edited a comment on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954 > I think we need to revisit the way this fix works. At this point, it probably makes sense to close the PR and go back to a design discussion first. I would suggest to have that design discussion on the JIRA issue first. > > (1) Where should the BOM be read? Beginning of the file only? Then it should most likely be part of split generation. > > (2) Any design cannot have additional logic in the method that is called per record, otherwise it will have too much performance impact. @StephanEwen Thank you for your suggestion. I close the PR first. Then discuss how to fix this bug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626690#comment-16626690 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index c1ef344175b..8eb43424264 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -62,6 +62,9 @@ // Charset is not serializable private transient Charset charset; + /** The charset of bom in the file to process. */ + private String bomCharsetName; + /** * The default read buffer size = 1MB. */ @@ -221,6 +224,11 @@ public void setCharset(String charset) { } } + @PublicEvolving + public String getBomCharsetName() { + return this.bomCharsetName; + } + public byte[] getDelimiter() { return delimiter; } @@ -341,7 +349,7 @@ public void configure(Configuration parameters) { @Override public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ? + final FileBaseStatistics cachedFileStats = cachedStats instanceof FileInputFormat.FileBaseStatistics ? (FileBaseStatistics) cachedStats : null; // store properties @@ -408,7 +416,9 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc while (samplesTaken < numSamples && fileNum < allFiles.size()) { // make a split for the sample and use it to read a record FileStatus file = allFiles.get(fileNum); - FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null); + String bomCharsetName = getBomCharset(file); + + FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null, bomCharsetName); // we open the split, read one line, and take its length try { @@ -467,6 +477,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc */ @Override public void open(FileInputSplit split) throws IOException { + this.bomCharsetName = split.getBomCharsetName(); super.open(split); initBuffers(); @@ -736,7 +747,7 @@ public void reopen(FileInputSplit split, Long state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); Preconditions.checkArgument(state == -1 || state >= split.getStart(), - " Illegal offset "+ state +", smaller than the splits start=" + split.getStart()); + " Illegal offset " + state + ", smaller than the splits start=" + split.getStart()); try { this.open(split); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 14cf647cd24..c58344fba6c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -34,7 +34,7 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; - +import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,20 +55,20 @@ * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented. * Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to * change the life cycle behavior. - * + * * After the {@link
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626689#comment-16626689 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954 > I think we need to revisit the way this fix works. At this point, it probably makes sense to close the PR and go back to a design discussion first. I would suggest to have that design discussion on the JIRA issue first. > > (1) Where should the BOM be read? Beginning of the file only? Then it should most likely be part of split generation. > > (2) Any design cannot have additional logic in the method that is called per record, otherwise it will have too much performance impact. @StephanEwen Thank you for your suggestion. I close the PR frist. Then discuss how to fix this bug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed
XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954 > I think we need to revisit the way this fix works. At this point, it probably makes sense to close the PR and go back to a design discussion first. I would suggest to have that design discussion on the JIRA issue first. > > (1) Where should the BOM be read? Beginning of the file only? Then it should most likely be part of split generation. > > (2) Any design cannot have additional logic in the method that is called per record, otherwise it will have too much performance impact. @StephanEwen Thank you for your suggestion. I close the PR frist. Then discuss how to fix this bug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed
XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index c1ef344175b..8eb43424264 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -62,6 +62,9 @@ // Charset is not serializable private transient Charset charset; + /** The charset of bom in the file to process. */ + private String bomCharsetName; + /** * The default read buffer size = 1MB. */ @@ -221,6 +224,11 @@ public void setCharset(String charset) { } } + @PublicEvolving + public String getBomCharsetName() { + return this.bomCharsetName; + } + public byte[] getDelimiter() { return delimiter; } @@ -341,7 +349,7 @@ public void configure(Configuration parameters) { @Override public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ? + final FileBaseStatistics cachedFileStats = cachedStats instanceof FileInputFormat.FileBaseStatistics ? (FileBaseStatistics) cachedStats : null; // store properties @@ -408,7 +416,9 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc while (samplesTaken < numSamples && fileNum < allFiles.size()) { // make a split for the sample and use it to read a record FileStatus file = allFiles.get(fileNum); - FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null); + String bomCharsetName = getBomCharset(file); + + FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null, bomCharsetName); // we open the split, read one line, and take its length try { @@ -467,6 +477,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc */ @Override public void open(FileInputSplit split) throws IOException { + this.bomCharsetName = split.getBomCharsetName(); super.open(split); initBuffers(); @@ -736,7 +747,7 @@ public void reopen(FileInputSplit split, Long state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); Preconditions.checkArgument(state == -1 || state >= split.getStart(), - " Illegal offset "+ state +", smaller than the splits start=" + split.getStart()); + " Illegal offset " + state + ", smaller than the splits start=" + split.getStart()); try { this.open(split); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 14cf647cd24..c58344fba6c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -34,7 +34,7 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; - +import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,20 +55,20 @@ * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented. * Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to * change the life cycle behavior. - * + * * After the {@link #open(FileInputSplit)} method completed, the file input data is available * from the {@link #stream} field. */ @Public public abstract class FileInputFormat extends RichInputFormat { - + //
[GitHub] yanghua commented on issue #6738: [FLINK-10126] There should be a Scala DataSource
yanghua commented on issue #6738: [FLINK-10126] There should be a Scala DataSource URL: https://github.com/apache/flink/pull/6738#issuecomment-424184780 hi @StephanEwen , Before providing the API directly, I evaluated the implementation of a peer `DataSource` in the flink-scala module. However, It will break the existing source API. It was found that `ExecutionEnvironment.scala` released some `readXXX` APIs that returned `DataSet.scala`. Inside these APIs, we got a `DataSource` (flink-java) and then called `wrap(source)` to convert it to a `DataSet.scala`. Considering that there are only two or three APIs exposed in the `DataSource.java` (one of which `withParameters` has been exposed through the `DataSet`), I chose to expose the API in the `DataSource` to the `DataSet.scala` with minimal changes and without breaking the current API. In fact, in this JIRA's description, reporter is more concerned with getting the APIs provided by `DataSource.java`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10126) There should be a Scala DataSource
[ https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626676#comment-16626676 ] ASF GitHub Bot commented on FLINK-10126: yanghua commented on issue #6738: [FLINK-10126] There should be a Scala DataSource URL: https://github.com/apache/flink/pull/6738#issuecomment-424184780 hi @StephanEwen , Before providing the API directly, I evaluated the implementation of a peer `DataSource` in the flink-scala module. However, It will break the existing source API. It was found that `ExecutionEnvironment.scala` released some `readXXX` APIs that returned `DataSet.scala`. Inside these APIs, we got a `DataSource` (flink-java) and then called `wrap(source)` to convert it to a `DataSet.scala`. Considering that there are only two or three APIs exposed in the `DataSource.java` (one of which `withParameters` has been exposed through the `DataSet`), I chose to expose the API in the `DataSource` to the `DataSet.scala` with minimal changes and without breaking the current API. In fact, in this JIRA's description, reporter is more concerned with getting the APIs provided by `DataSource.java`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > There should be a Scala DataSource > -- > > Key: FLINK-10126 > URL: https://issues.apache.org/jira/browse/FLINK-10126 > Project: Flink > Issue Type: Improvement >Reporter: Alexis Sarda-Espinosa >Assignee: vinoyang >Priority: Minor > Labels: datasource, pull-request-available, scala > > In Java, an ExecutionEnvironment's createInput method returns a DataSource, > whereas the Scala version returns a DataSet. There is no Scala DataSource > wrapper, and the Scala DataSet does not provide the Java DataSource methods, > such as getSplitDataProperties. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10065) InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream
[ https://issues.apache.org/jira/browse/FLINK-10065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626673#comment-16626673 ] ASF GitHub Bot commented on FLINK-10065: klion26 commented on issue #6498: [FLINK-10065] InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream URL: https://github.com/apache/flink/pull/6498#issuecomment-424183722 @tillrohrmann , @StefanRRichter , @tzulitai thank you all for replying, is there anything I need to do before continuing? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean > isFailureTolerant) will close the inputStream > - > > Key: FLINK-10065 > URL: https://issues.apache.org/jira/browse/FLINK-10065 > Project: Flink > Issue Type: Bug >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > Now, the implementation of InstantiationUtil.deserializeObject(InputStream > in, ClassLoader cl, boolean isFailureTolerant) is > {code:java} > @SuppressWarnings("unchecked") > public static T deserializeObject(InputStream in, ClassLoader cl, boolean > isFailureTolerant) > throws IOException, ClassNotFoundException { > final ClassLoader old = Thread.currentThread().getContextClassLoader(); > // not using resource try to avoid AutoClosable's close() on the given stream > try (ObjectInputStream oois = isFailureTolerant > ? new InstantiationUtil.FailureTolerantObjectInputStream(in, cl) > : new InstantiationUtil.ClassLoaderObjectInputStream(in, cl)) { > Thread.currentThread().setContextClassLoader(cl); > return (T) oois.readObject(); > } > finally { > Thread.currentThread().setContextClassLoader(old); > } > } > {code} > InputStream is closable, so the parameter will be closed after call this > method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on issue #6498: [FLINK-10065] InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream
klion26 commented on issue #6498: [FLINK-10065] InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream URL: https://github.com/apache/flink/pull/6498#issuecomment-424183722 @tillrohrmann , @StefanRRichter , @tzulitai thank you all for replying, is there anything I need to do before continuing? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10399) Refractor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626599#comment-16626599 ] ASF GitHub Bot commented on FLINK-10399: TisonKun commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#issuecomment-424165441 @StephanEwen Thanks for your review! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refractor ParameterTool#fromArgs > > > Key: FLINK-10399 > URL: https://issues.apache.org/jira/browse/FLINK-10399 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{ParameterTool#fromArgs}} uses a weird implement which flink developer would > fail to parse it fast. > The main problem is that, when parse args, we always try to get a key-value > pair, but the implement iterate by a {{for}} loop, thus introduce weird > flag/mutable variable and branches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs
TisonKun commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#issuecomment-424165441 @StephanEwen Thanks for your review! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy
[ https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626469#comment-16626469 ] ASF GitHub Bot commented on FLINK-10289: isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#issuecomment-424127134 > I really like this idea in principle. > > Instead of attaching an enum classifier, could we solve this by exception inheritance? Having exceptions that represent the categories and then making the specific exceptions subclasses of the category exceptions? Thanks Stephan, use exception inheritance certainly a good way, however: - enum explicitly tell that there are four types of exceptions, while exception is more scattered. - Java doesn't support multi-inherits that will make subclasses a little harder but we can also use Java interface or Java annotation - There is a high level class need to consume this classification logic, so the ThrowableClassifier here is simply the logic, eg: if we use annotation, we need a logic to extract the annotation. so i would propose to mix those ideas together, keep the classify logic and use Java annotation/interface, @StephanEwen let me know what you think. Jin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Classify Exceptions to different category for apply different failover > strategy > --- > > Key: FLINK-10289 > URL: https://issues.apache.org/jira/browse/FLINK-10289 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > > We need to classify exceptions and treat them with different strategies. To > do this, we propose to introduce the following Throwable Types, and the > corresponding exceptions: > * NonRecoverable > ** We shouldn’t retry if an exception was classified as NonRecoverable > ** For example, NoResouceAvailiableException is a NonRecoverable Exception > ** Introduce a new Exception UserCodeException to wrap all exceptions that > throw from user code > * PartitionDataMissingError > ** In certain scenarios producer data was transferred in blocking mode or > data was saved in persistent store. If the partition was missing, we need to > revoke/rerun the produce task to regenerate the data. > ** Introduce a new exception PartitionDataMissingException to wrap all those > kinds of issues. > * EnvironmentError > ** It happened due to hardware, or software issues that were related to > specific environments. The assumption is that a task will succeed if we run > it in a different environment, and other task run in this bad environment > will very likely fail. If multiple task failures in the same machine due to > EnvironmentError, we need to consider adding the bad machine to blacklist, > and avoiding schedule task on it. > ** Introduce a new exception EnvironmentException to wrap all those kind of > issues. > * Recoverable > ** We assume other issues are recoverable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy
isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy URL: https://github.com/apache/flink/pull/6739#issuecomment-424127134 > I really like this idea in principle. > > Instead of attaching an enum classifier, could we solve this by exception inheritance? Having exceptions that represent the categories and then making the specific exceptions subclasses of the category exceptions? Thanks Stephan, use exception inheritance certainly a good way, however: - enum explicitly tell that there are four types of exceptions, while exception is more scattered. - Java doesn't support multi-inherits that will make subclasses a little harder but we can also use Java interface or Java annotation - There is a high level class need to consume this classification logic, so the ThrowableClassifier here is simply the logic, eg: if we use annotation, we need a logic to extract the annotation. so i would propose to mix those ideas together, keep the classify logic and use Java annotation/interface, @StephanEwen let me know what you think. Jin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626449#comment-16626449 ] ASF GitHub Bot commented on FLINK-9126: --- bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219984964 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ## @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception { Assert.assertTrue("The input data was not completely written to Cassandra", input.isEmpty()); } + @Test + public void testCassandraBatchPojoFormat() throws Exception { + + OutputFormat> sink = new CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); + sink.configure(new Configuration()); + sink.open(0, 1); + + for (Tuple3 value : collection) { + sink.writeRecord(value); + } + + sink.close(); + + InputFormat source = new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, CustomCassandraAnnotatedPojo.class); + source.configure(new Configuration()); + source.open(null); + + List result = new ArrayList<>(); + + while (!source.reachedEnd()) { + CustomCassandraAnnotatedPojo temp = source.nextRecord(new CustomCassandraAnnotatedPojo()); + result.add(temp); + } + + source.close(); + Assert.assertEquals(20, result.size()); Review comment: To save easily a list of CustomCassandraAnnotatedPojo, i used the CassandraPojoSink This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219984964 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ## @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception { Assert.assertTrue("The input data was not completely written to Cassandra", input.isEmpty()); } + @Test + public void testCassandraBatchPojoFormat() throws Exception { + + OutputFormat> sink = new CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); + sink.configure(new Configuration()); + sink.open(0, 1); + + for (Tuple3 value : collection) { + sink.writeRecord(value); + } + + sink.close(); + + InputFormat source = new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, CustomCassandraAnnotatedPojo.class); + source.configure(new Configuration()); + source.open(null); + + List result = new ArrayList<>(); + + while (!source.reachedEnd()) { + CustomCassandraAnnotatedPojo temp = source.nextRecord(new CustomCassandraAnnotatedPojo()); + result.add(temp); + } + + source.close(); + Assert.assertEquals(20, result.size()); Review comment: To save easily a list of CustomCassandraAnnotatedPojo, i used the CassandraPojoSink This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626435#comment-16626435 ] ASF GitHub Bot commented on FLINK-9126: --- bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219980389 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; +import org.apache.flink.util.Preconditions; + +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * + * @param type of inputClass + */ +public class CassandraPojoInputFormat extends CassandraInputFormatBase { + + private static final long serialVersionUID = 1992091320180905115L; + private final MapperOptions mapperOptions; + + private transient Result resultSet; + private Class inputClass; + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass) { + this(query, builder, inputClass, null); + } + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass, MapperOptions mapperOptions) { + super(query, builder); + this.mapperOptions = mapperOptions; + Preconditions.checkArgument(inputClass != null, "InputClass cannot be null"); Review comment: like previous one, i think we should stick on checkArgument to keep a IllegalArgumentException This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, >
[GitHub] bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219980389 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; +import org.apache.flink.util.Preconditions; + +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * + * @param type of inputClass + */ +public class CassandraPojoInputFormat extends CassandraInputFormatBase { + + private static final long serialVersionUID = 1992091320180905115L; + private final MapperOptions mapperOptions; + + private transient Result resultSet; + private Class inputClass; + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass) { + this(query, builder, inputClass, null); + } + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass, MapperOptions mapperOptions) { + super(query, builder); + this.mapperOptions = mapperOptions; + Preconditions.checkArgument(inputClass != null, "InputClass cannot be null"); Review comment: like previous one, i think we should stick on checkArgument to keep a IllegalArgumentException This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626308#comment-16626308 ] ASF GitHub Bot commented on FLINK-9126: --- bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219957391 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); Review comment: it will throw a NullPointerException instead of IllegallArgumentException, like others classes of the connector, i think we should keep checkArgument This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class
[GitHub] bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219957391 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); Review comment: it will throw a NullPointerException instead of IllegallArgumentException, like others classes of the connector, i think we should keep checkArgument This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8819) Rework travis script to use build stages
[ https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626305#comment-16626305 ] ASF GitHub Bot commented on FLINK-8819: --- GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages URL: https://github.com/apache/flink/pull/6642#discussion_r219956271 ## File path: tools/travis_controller.sh ## @@ -0,0 +1,211 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +CACHE_DIR="$HOME/flink_cache" +CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER" +CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink" + +HERE="`dirname \"$0\"`"# relative +HERE="`( cd \"$HERE\" && pwd )`" # absolutized and normalized +if [ -z "$HERE" ] ; then + # error; for some reason, the path is not accessible + # to the script (e.g. permissions re-evaled after suid) + exit 1 # fail +fi + +source "${HERE}/travis/fold.sh" +source "${HERE}/travis/stage.sh" +source "${HERE}/travis/shade.sh" + +function deleteOldCaches() { + while read CACHE_DIR; do + local old_number="${CACHE_DIR##*/}" + if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then + echo "Deleting old cache $CACHE_DIR" + rm -rf "$CACHE_DIR" + fi + done +} + +# delete leftover caches from previous builds +find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | deleteOldCaches + +function getCurrentStage() { + STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2) + case $STAGE_NUMBER in + (1) + echo "$STAGE_COMPILE" + ;; + (2) + echo "$STAGE_COMPILE" + ;; + (3) + echo "$STAGE_CORE" + ;; + (4) + echo "$STAGE_LIBRARIES" + ;; + (5) + echo "$STAGE_CONNECTORS" + ;; + (6) + echo "$STAGE_TESTS" + ;; + (7) + echo "$STAGE_MISC" + ;; + (8) + echo "$STAGE_CORE" + ;; + (9) + echo "$STAGE_LIBRARIES" + ;; + (10) + echo "$STAGE_CONNECTORS" + ;; + (11) + echo "$STAGE_TESTS" + ;; + (12) + echo "$STAGE_MISC" + ;; + (13) + echo "$STAGE_CLEANUP" + ;; + (14) + echo "$STAGE_CLEANUP" + ;; + (*) + echo "Invalid stage detected ($STAGE_NUMBER)" + return 1 + ;; + esac + + return 0 +} + +STAGE=$(getCurrentStage) +if [ $? != 0 ]; then + echo "Could not determine current stage." + exit 1 +fi +echo "Current stage: \"$STAGE\"" + +EXIT_CODE=0 + +# Run actual compile steps +if [ $STAGE == "$STAGE_COMPILE" ]; then + MVN="mvn clean install -nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE" + $MVN + EXIT_CODE=$? + +if [ $EXIT_CODE == 0 ]; then +printf "\n\n==\n" +printf "Checking dependency convergence\n" +printf "==\n" + +./tools/check_dependency_convergence.sh +EXIT_CODE=$? +else +printf
[jira] [Commented] (FLINK-8819) Rework travis script to use build stages
[ https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626303#comment-16626303 ] ASF GitHub Bot commented on FLINK-8819: --- GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages URL: https://github.com/apache/flink/pull/6642#discussion_r219956271 ## File path: tools/travis_controller.sh ## @@ -0,0 +1,211 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +CACHE_DIR="$HOME/flink_cache" +CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER" +CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink" + +HERE="`dirname \"$0\"`"# relative +HERE="`( cd \"$HERE\" && pwd )`" # absolutized and normalized +if [ -z "$HERE" ] ; then + # error; for some reason, the path is not accessible + # to the script (e.g. permissions re-evaled after suid) + exit 1 # fail +fi + +source "${HERE}/travis/fold.sh" +source "${HERE}/travis/stage.sh" +source "${HERE}/travis/shade.sh" + +function deleteOldCaches() { + while read CACHE_DIR; do + local old_number="${CACHE_DIR##*/}" + if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then + echo "Deleting old cache $CACHE_DIR" + rm -rf "$CACHE_DIR" + fi + done +} + +# delete leftover caches from previous builds +find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | deleteOldCaches + +function getCurrentStage() { + STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2) + case $STAGE_NUMBER in + (1) + echo "$STAGE_COMPILE" + ;; + (2) + echo "$STAGE_COMPILE" + ;; + (3) + echo "$STAGE_CORE" + ;; + (4) + echo "$STAGE_LIBRARIES" + ;; + (5) + echo "$STAGE_CONNECTORS" + ;; + (6) + echo "$STAGE_TESTS" + ;; + (7) + echo "$STAGE_MISC" + ;; + (8) + echo "$STAGE_CORE" + ;; + (9) + echo "$STAGE_LIBRARIES" + ;; + (10) + echo "$STAGE_CONNECTORS" + ;; + (11) + echo "$STAGE_TESTS" + ;; + (12) + echo "$STAGE_MISC" + ;; + (13) + echo "$STAGE_CLEANUP" + ;; + (14) + echo "$STAGE_CLEANUP" + ;; + (*) + echo "Invalid stage detected ($STAGE_NUMBER)" + return 1 + ;; + esac + + return 0 +} + +STAGE=$(getCurrentStage) +if [ $? != 0 ]; then + echo "Could not determine current stage." + exit 1 +fi +echo "Current stage: \"$STAGE\"" + +EXIT_CODE=0 + +# Run actual compile steps +if [ $STAGE == "$STAGE_COMPILE" ]; then + MVN="mvn clean install -nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE" + $MVN + EXIT_CODE=$? + +if [ $EXIT_CODE == 0 ]; then +printf "\n\n==\n" +printf "Checking dependency convergence\n" +printf "==\n" + +./tools/check_dependency_convergence.sh +EXIT_CODE=$? +else +printf
[jira] [Commented] (FLINK-10399) Refractor ParameterTool#fromArgs
[ https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626304#comment-16626304 ] ASF GitHub Bot commented on FLINK-10399: StephanEwen commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#issuecomment-424092282 **Consensus that the contribution should go into to Flink** - +1 from my side - tests show the behavior is unchanged, so low risk - cleanup of bad code is desirable is low risk **Does not need specific attention** - No specific committer attention needed **Contribution description is fine** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refractor ParameterTool#fromArgs > > > Key: FLINK-10399 > URL: https://issues.apache.org/jira/browse/FLINK-10399 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.7.0 >Reporter: tison >Assignee: tison >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{ParameterTool#fromArgs}} uses a weird implement which flink developer would > fail to parse it fast. > The main problem is that, when parse args, we always try to get a key-value > pair, but the implement iterate by a {{for}} loop, thus introduce weird > flag/mutable variable and branches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages
GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages URL: https://github.com/apache/flink/pull/6642#discussion_r219956271 ## File path: tools/travis_controller.sh ## @@ -0,0 +1,211 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +CACHE_DIR="$HOME/flink_cache" +CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER" +CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink" + +HERE="`dirname \"$0\"`"# relative +HERE="`( cd \"$HERE\" && pwd )`" # absolutized and normalized +if [ -z "$HERE" ] ; then + # error; for some reason, the path is not accessible + # to the script (e.g. permissions re-evaled after suid) + exit 1 # fail +fi + +source "${HERE}/travis/fold.sh" +source "${HERE}/travis/stage.sh" +source "${HERE}/travis/shade.sh" + +function deleteOldCaches() { + while read CACHE_DIR; do + local old_number="${CACHE_DIR##*/}" + if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then + echo "Deleting old cache $CACHE_DIR" + rm -rf "$CACHE_DIR" + fi + done +} + +# delete leftover caches from previous builds +find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | deleteOldCaches + +function getCurrentStage() { + STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2) + case $STAGE_NUMBER in + (1) + echo "$STAGE_COMPILE" + ;; + (2) + echo "$STAGE_COMPILE" + ;; + (3) + echo "$STAGE_CORE" + ;; + (4) + echo "$STAGE_LIBRARIES" + ;; + (5) + echo "$STAGE_CONNECTORS" + ;; + (6) + echo "$STAGE_TESTS" + ;; + (7) + echo "$STAGE_MISC" + ;; + (8) + echo "$STAGE_CORE" + ;; + (9) + echo "$STAGE_LIBRARIES" + ;; + (10) + echo "$STAGE_CONNECTORS" + ;; + (11) + echo "$STAGE_TESTS" + ;; + (12) + echo "$STAGE_MISC" + ;; + (13) + echo "$STAGE_CLEANUP" + ;; + (14) + echo "$STAGE_CLEANUP" + ;; + (*) + echo "Invalid stage detected ($STAGE_NUMBER)" + return 1 + ;; + esac + + return 0 +} + +STAGE=$(getCurrentStage) +if [ $? != 0 ]; then + echo "Could not determine current stage." + exit 1 +fi +echo "Current stage: \"$STAGE\"" + +EXIT_CODE=0 + +# Run actual compile steps +if [ $STAGE == "$STAGE_COMPILE" ]; then + MVN="mvn clean install -nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE" + $MVN + EXIT_CODE=$? + +if [ $EXIT_CODE == 0 ]; then +printf "\n\n==\n" +printf "Checking dependency convergence\n" +printf "==\n" + +./tools/check_dependency_convergence.sh +EXIT_CODE=$? +else +printf "\n==\n" +printf "Previous build failure detected, skipping dependency-convergence check.\n" +printf "==\n" +fi + +if [
[GitHub] StephanEwen commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs
StephanEwen commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs URL: https://github.com/apache/flink/pull/6737#issuecomment-424092282 **Consensus that the contribution should go into to Flink** - +1 from my side - tests show the behavior is unchanged, so low risk - cleanup of bad code is desirable is low risk **Does not need specific attention** - No specific committer attention needed **Contribution description is fine** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages
GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages URL: https://github.com/apache/flink/pull/6642#discussion_r219956271 ## File path: tools/travis_controller.sh ## @@ -0,0 +1,211 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +CACHE_DIR="$HOME/flink_cache" +CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER" +CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink" + +HERE="`dirname \"$0\"`"# relative +HERE="`( cd \"$HERE\" && pwd )`" # absolutized and normalized +if [ -z "$HERE" ] ; then + # error; for some reason, the path is not accessible + # to the script (e.g. permissions re-evaled after suid) + exit 1 # fail +fi + +source "${HERE}/travis/fold.sh" +source "${HERE}/travis/stage.sh" +source "${HERE}/travis/shade.sh" + +function deleteOldCaches() { + while read CACHE_DIR; do + local old_number="${CACHE_DIR##*/}" + if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then + echo "Deleting old cache $CACHE_DIR" + rm -rf "$CACHE_DIR" + fi + done +} + +# delete leftover caches from previous builds +find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | deleteOldCaches + +function getCurrentStage() { + STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2) + case $STAGE_NUMBER in + (1) + echo "$STAGE_COMPILE" + ;; + (2) + echo "$STAGE_COMPILE" + ;; + (3) + echo "$STAGE_CORE" + ;; + (4) + echo "$STAGE_LIBRARIES" + ;; + (5) + echo "$STAGE_CONNECTORS" + ;; + (6) + echo "$STAGE_TESTS" + ;; + (7) + echo "$STAGE_MISC" + ;; + (8) + echo "$STAGE_CORE" + ;; + (9) + echo "$STAGE_LIBRARIES" + ;; + (10) + echo "$STAGE_CONNECTORS" + ;; + (11) + echo "$STAGE_TESTS" + ;; + (12) + echo "$STAGE_MISC" + ;; + (13) + echo "$STAGE_CLEANUP" + ;; + (14) + echo "$STAGE_CLEANUP" + ;; + (*) + echo "Invalid stage detected ($STAGE_NUMBER)" + return 1 + ;; + esac + + return 0 +} + +STAGE=$(getCurrentStage) +if [ $? != 0 ]; then + echo "Could not determine current stage." + exit 1 +fi +echo "Current stage: \"$STAGE\"" + +EXIT_CODE=0 + +# Run actual compile steps +if [ $STAGE == "$STAGE_COMPILE" ]; then + MVN="mvn clean install -nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests $PROFILE" + $MVN + EXIT_CODE=$? + +if [ $EXIT_CODE == 0 ]; then +printf "\n\n==\n" +printf "Checking dependency convergence\n" +printf "==\n" + +./tools/check_dependency_convergence.sh +EXIT_CODE=$? +else +printf "\n==\n" +printf "Previous build failure detected, skipping dependency-convergence check.\n" +printf "==\n" +fi + +if [
[jira] [Commented] (FLINK-8819) Rework travis script to use build stages
[ https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626299#comment-16626299 ] ASF GitHub Bot commented on FLINK-8819: --- GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages URL: https://github.com/apache/flink/pull/6642#discussion_r219955526 ## File path: tools/travis_controller.sh ## @@ -0,0 +1,211 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +CACHE_DIR="$HOME/flink_cache" +CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER" +CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink" + +HERE="`dirname \"$0\"`"# relative +HERE="`( cd \"$HERE\" && pwd )`" # absolutized and normalized +if [ -z "$HERE" ] ; then + # error; for some reason, the path is not accessible + # to the script (e.g. permissions re-evaled after suid) + exit 1 # fail +fi + +source "${HERE}/travis/fold.sh" +source "${HERE}/travis/stage.sh" +source "${HERE}/travis/shade.sh" + +function deleteOldCaches() { + while read CACHE_DIR; do + local old_number="${CACHE_DIR##*/}" + if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then + echo "Deleting old cache $CACHE_DIR" + rm -rf "$CACHE_DIR" + fi + done +} + +# delete leftover caches from previous builds +find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | deleteOldCaches + +function getCurrentStage() { + STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2) + case $STAGE_NUMBER in + (1) Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Rework travis script to use build stages > > > Key: FLINK-8819 > URL: https://issues.apache.org/jira/browse/FLINK-8819 > Project: Flink > Issue Type: Sub-task > Components: Build System, Travis >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Labels: pull-request-available > > This issue is for tracking efforts to rework our Travis scripts to use > [stages|https://docs.travis-ci.com/user/build-stages/]. > This feature allows us to define a sequence of jobs that are run one after > another. This implies that we can define dependencies between jobs, in > contrast to our existing jobs that have to be self-contained. > As an example, we could have a compile stage, and a test stage with multiple > jobs. > The main benefit here is that we no longer have to compile modules multiple > times, which would reduce our build times. > The major issue here however is that there is no _proper_ support for passing > build-artifacts from one stage to the next. According to this > [issue|https://github.com/travis-ci/beta-features/issues/28] it is on their > to-do-list however. > In the mean-time we could manually transfer the artifacts between stages by > either using the Travis cache or some other external storage. The cache > solution would work by setting up a cached directory (just like the mvn > cache) and creating build-scope directories within containing the artifacts > (I have a prototype that works like this). > The major concern here is that of cleaning up the cache/storage. > We can clean things up if > * our script fails > * the last stage succeeds. > We can *not* clean things up if > * the build is canceled > * travis fails the build due to a timeout or similar > as apparently there is [no way to run a script at the end of a >
[jira] [Commented] (FLINK-8819) Rework travis script to use build stages
[ https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626300#comment-16626300 ] ASF GitHub Bot commented on FLINK-8819: --- GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages URL: https://github.com/apache/flink/pull/6642#discussion_r219955579 ## File path: tools/travis_mvn_watchdog.sh ## @@ -160,10 +55,11 @@ esac # -nsu option forbids downloading snapshot artifacts. The only snapshot artifacts we depend are from # Flink, which however should all be built locally. see FLINK-7230 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn" -MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -B $MVN_LOGGING_OPTIONS" -MVN_COMPILE_OPTIONS="$MVN_COMPILE_OPTIONS -DskipTests" +MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -B $MVN_LOGGING_OPTIONS" +MVN_COMPILE_OPTIONS="-DskipTests" +MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS" -MVN_COMPILE="mvn $MVN_COMMON_OPTIONS $MVN_COMPILE_OPTIONS $PROFILE $MVN_COMPILE_MODULES clean install" +MVN_COMPILE="mvn $MVN_COMMON_OPTIONS $MVN_COMPILE_OPTIONS $PROFILE $MVN_COMPILE_MODULES install" MVN_TEST="mvn $MVN_COMMON_OPTIONS $MVN_TEST_OPTIONS $PROFILE $MVN_TEST_MODULES verify" MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid" Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Rework travis script to use build stages > > > Key: FLINK-8819 > URL: https://issues.apache.org/jira/browse/FLINK-8819 > Project: Flink > Issue Type: Sub-task > Components: Build System, Travis >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Labels: pull-request-available > > This issue is for tracking efforts to rework our Travis scripts to use > [stages|https://docs.travis-ci.com/user/build-stages/]. > This feature allows us to define a sequence of jobs that are run one after > another. This implies that we can define dependencies between jobs, in > contrast to our existing jobs that have to be self-contained. > As an example, we could have a compile stage, and a test stage with multiple > jobs. > The main benefit here is that we no longer have to compile modules multiple > times, which would reduce our build times. > The major issue here however is that there is no _proper_ support for passing > build-artifacts from one stage to the next. According to this > [issue|https://github.com/travis-ci/beta-features/issues/28] it is on their > to-do-list however. > In the mean-time we could manually transfer the artifacts between stages by > either using the Travis cache or some other external storage. The cache > solution would work by setting up a cached directory (just like the mvn > cache) and creating build-scope directories within containing the artifacts > (I have a prototype that works like this). > The major concern here is that of cleaning up the cache/storage. > We can clean things up if > * our script fails > * the last stage succeeds. > We can *not* clean things up if > * the build is canceled > * travis fails the build due to a timeout or similar > as apparently there is [no way to run a script at the end of a > build|https://github.com/travis-ci/travis-ci/issues/4221]. > Thus we would either have to periodically clear the cache, or encode more > information into the cached files that would allow _other_ builds to clean up > stale date. (For example the build number or date). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages
GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages URL: https://github.com/apache/flink/pull/6642#discussion_r219955579 ## File path: tools/travis_mvn_watchdog.sh ## @@ -160,10 +55,11 @@ esac # -nsu option forbids downloading snapshot artifacts. The only snapshot artifacts we depend are from # Flink, which however should all be built locally. see FLINK-7230 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn" -MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -B $MVN_LOGGING_OPTIONS" -MVN_COMPILE_OPTIONS="$MVN_COMPILE_OPTIONS -DskipTests" +MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dfast -B $MVN_LOGGING_OPTIONS" +MVN_COMPILE_OPTIONS="-DskipTests" +MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS" -MVN_COMPILE="mvn $MVN_COMMON_OPTIONS $MVN_COMPILE_OPTIONS $PROFILE $MVN_COMPILE_MODULES clean install" +MVN_COMPILE="mvn $MVN_COMMON_OPTIONS $MVN_COMPILE_OPTIONS $PROFILE $MVN_COMPILE_MODULES install" MVN_TEST="mvn $MVN_COMMON_OPTIONS $MVN_TEST_OPTIONS $PROFILE $MVN_TEST_MODULES verify" MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid" Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages
GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages URL: https://github.com/apache/flink/pull/6642#discussion_r219955526 ## File path: tools/travis_controller.sh ## @@ -0,0 +1,211 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +CACHE_DIR="$HOME/flink_cache" +CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER" +CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink" + +HERE="`dirname \"$0\"`"# relative +HERE="`( cd \"$HERE\" && pwd )`" # absolutized and normalized +if [ -z "$HERE" ] ; then + # error; for some reason, the path is not accessible + # to the script (e.g. permissions re-evaled after suid) + exit 1 # fail +fi + +source "${HERE}/travis/fold.sh" +source "${HERE}/travis/stage.sh" +source "${HERE}/travis/shade.sh" + +function deleteOldCaches() { + while read CACHE_DIR; do + local old_number="${CACHE_DIR##*/}" + if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then + echo "Deleting old cache $CACHE_DIR" + rm -rf "$CACHE_DIR" + fi + done +} + +# delete leftover caches from previous builds +find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | deleteOldCaches + +function getCurrentStage() { + STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2) + case $STAGE_NUMBER in + (1) Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10126) There should be a Scala DataSource
[ https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626292#comment-16626292 ] ASF GitHub Bot commented on FLINK-10126: StephanEwen commented on issue #6738: [FLINK-10126] There should be a Scala DataSource URL: https://github.com/apache/flink/pull/6738#issuecomment-424090397 This fix seems to be a workaround for the real issue that the Scala API misses a DataSource type. I would suggest to see if that can be introduced without breaking the API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > There should be a Scala DataSource > -- > > Key: FLINK-10126 > URL: https://issues.apache.org/jira/browse/FLINK-10126 > Project: Flink > Issue Type: Improvement >Reporter: Alexis Sarda-Espinosa >Assignee: vinoyang >Priority: Minor > Labels: datasource, pull-request-available, scala > > In Java, an ExecutionEnvironment's createInput method returns a DataSource, > whereas the Scala version returns a DataSet. There is no Scala DataSource > wrapper, and the Scala DataSet does not provide the Java DataSource methods, > such as getSplitDataProperties. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6738: [FLINK-10126] There should be a Scala DataSource
StephanEwen commented on issue #6738: [FLINK-10126] There should be a Scala DataSource URL: https://github.com/apache/flink/pull/6738#issuecomment-424090397 This fix seems to be a workaround for the real issue that the Scala API misses a DataSource type. I would suggest to see if that can be introduced without breaking the API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626240#comment-16626240 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219941080 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. Review comment: does `-/CassandraOutputFormat` mean something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626239#comment-16626239 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219940842 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ## @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception { Assert.assertTrue("The input data was not completely written to Cassandra", input.isEmpty()); } + @Test + public void testCassandraBatchPojoFormat() throws Exception { + + OutputFormat> sink = new CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); + sink.configure(new Configuration()); + sink.open(0, 1); + + for (Tuple3 value : collection) { + sink.writeRecord(value); + } + + sink.close(); + + InputFormat source = new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, CustomCassandraAnnotatedPojo.class); + source.configure(new Configuration()); + source.open(null); + + List result = new ArrayList<>(); + + while (!source.reachedEnd()) { + CustomCassandraAnnotatedPojo temp = source.nextRecord(new CustomCassandraAnnotatedPojo()); + result.add(temp); + } + + source.close(); + Assert.assertEquals(20, result.size()); Review comment: Can we also convert `collection` of tuples into list of `CustomCassandraAnnotatedPojo` and check the contents as well, additionally to size? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626244#comment-16626244 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219932299 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. Review comment: Comment: Base class for {@link RichInputFormat} to read data from Apache Cassandra. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626237#comment-16626237 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219943237 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); + */ +public class BatchPojoExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* +* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" +*/ + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ArrayList> collection = new ArrayList<>(20); + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>(i, "string " + i)); + } + + DataSet> dataSet = env.fromCollection(collection); + + dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() { Review comment: we can remove `Tuple2` and use empty `<>` (two places) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated >
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219941080 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. Review comment: does `-/CassandraOutputFormat` mean something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626236#comment-16626236 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219935777 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; + +/** + * Example of Cassandra Annotated POJO class for use with CassandraInputFormatter. Review comment: In doc comment: `CassandraInputFormatter` -> `{@link CassandraPojoInputFormat}` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626241#comment-16626241 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219942318 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; Review comment: I think there is still some mismatch between queries, doc comments, tuples in `dataSet ` and fields of `CustomCassandraAnnotatedPojo`. `testCassandraBatchPojoFormat` looks better from this side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626245#comment-16626245 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219934055 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; +import org.apache.flink.util.Preconditions; + +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * + * @param type of inputClass + */ +public class CassandraPojoInputFormat extends CassandraInputFormatBase { + + private static final long serialVersionUID = 1992091320180905115L; + private final MapperOptions mapperOptions; + + private transient Result resultSet; + private Class inputClass; Review comment: can be `final` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626232#comment-16626232 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219929097 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java ## @@ -44,38 +31,18 @@ * * @param type of Tuple */ -public class CassandraInputFormat extends RichInputFormat implements NonParallelInput { - private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class); +public class CassandraInputFormat extends CassandraInputFormatBase { Review comment: please add `serialVersionUID` to `CassandraInputFormat` and `CassandraInputFormatBase` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up > [^CassandraPojoInputFormatText.rtf] > > Will make another Jira Issue for the Output version next if this is approved -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626234#comment-16626234 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219929908 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); Review comment: I think we can use `checkNotNull`: ``` Preconditions.checkNotNull(builder, "Builder cannot be null"); ``` for readability This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626238#comment-16626238 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219930899 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + + this.query = query; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { Review comment: We can remove exception declarations from all methods (also in other touched classes) where they are not actually thrown. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API.
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626235#comment-16626235 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219942450 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); + */ +public class BatchPojoExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* +* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" +*/ + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ArrayList> collection = new ArrayList<>(20); + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>(i, "string " + i)); + } + + DataSet> dataSet = env.fromCollection(collection); + + dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + })); + Review comment: I think `env.execute("Write");` should be here, not at the end. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h >
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626233#comment-16626233 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219934197 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; +import org.apache.flink.util.Preconditions; + +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * + * @param type of inputClass + */ +public class CassandraPojoInputFormat extends CassandraInputFormatBase { + + private static final long serialVersionUID = 1992091320180905115L; + private final MapperOptions mapperOptions; + + private transient Result resultSet; + private Class inputClass; + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass) { + this(query, builder, inputClass, null); + } + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass, MapperOptions mapperOptions) { + super(query, builder); + this.mapperOptions = mapperOptions; + Preconditions.checkArgument(inputClass != null, "InputClass cannot be null"); Review comment: `checkNotNull` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); >
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626246#comment-16626246 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219943693 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); + */ +public class BatchPojoExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* +* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" +*/ + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ArrayList> collection = new ArrayList<>(20); + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>(i, "string " + i)); + } + + DataSet> dataSet = env.fromCollection(collection); + + dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + })); + + /* +* This is for the purpose of showing an example of creating a DataSet using CassandraPojoInputFormat. +*/ + DataSet inputDS = env + .createInput(new CassandraPojoInputFormat<>(SELECT_QUERY, new ClusterBuilder() { Review comment: `ClusterBuilder` should also have `serialVersionUID`. It also duplicated in 2 places. We can create a variable to define it once. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626242#comment-16626242 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219931705 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + + this.query = query; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) throws IOException { + GenericInputSplit[] split = {new GenericInputSplit(0, 1)}; + return split; Review comment: `split` local variable is redundant, we can just return `{new GenericInputSplit(0, 1)}` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API
[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo
[ https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626243#comment-16626243 ] ASF GitHub Bot commented on FLINK-9126: --- azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219930598 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { Review comment: I think the whole class, constructor and fields can be package private This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Creation of the CassandraPojoInputFormat class to output data into a Custom > Cassandra Annotated Pojo > > > Key: FLINK-9126 > URL: https://issues.apache.org/jira/browse/FLINK-9126 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jeffrey Carter >Assignee: Jeffrey Carter >Priority: Minor > Labels: InputFormat, cassandra, features, pull-request-available > Fix For: 1.7.0 > > Attachments: CassandraPojoInputFormatText.rtf > > Original Estimate: 24h > Remaining Estimate: 24h > > Currently the DataSet API only has the ability to output data received from > Cassandra as a source in as a Tuple. This would be allow the data to be > output as a custom POJO that the user has created that has been annotated > using Datastax API. This would remove the need of very long Tuples to be > created by the DataSet and then mapped to the custom POJO. > > -The changes to the CassandraInputFormat object would be minimal, but would > require importing the Datastax API into the class-. Another option is to make > a similar, but slightly different class called CassandraPojoInputFormat. > I have already gotten code for this working in my own project, but want other > thoughts as to the best way this should go about being implemented. > > //Example of its use in main > CassandraPojoInputFormat cassandraInputFormat = new > CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, > CustomCassandraPojo.class); > cassandraInputFormat.configure(null); > cassandraInputFormat.open(null); > DataSet outputTestSet = > exEnv.createInput(cassandraInputFormat, TypeInformation.of(new > TypeHint(){})); > > //The class that I currently have set up >
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219942450 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); + */ +public class BatchPojoExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* +* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" +*/ + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ArrayList> collection = new ArrayList<>(20); + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>(i, "string " + i)); + } + + DataSet> dataSet = env.fromCollection(collection); + + dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + })); + Review comment: I think `env.execute("Write");` should be here, not at the end. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219931705 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + + this.query = query; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) throws IOException { + GenericInputSplit[] split = {new GenericInputSplit(0, 1)}; + return split; Review comment: `split` local variable is redundant, we can just return `{new GenericInputSplit(0, 1)}` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219930598 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { Review comment: I think the whole class, constructor and fields can be package private This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219943693 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); + */ +public class BatchPojoExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* +* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" +*/ + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ArrayList> collection = new ArrayList<>(20); + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>(i, "string " + i)); + } + + DataSet> dataSet = env.fromCollection(collection); + + dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + })); + + /* +* This is for the purpose of showing an example of creating a DataSet using CassandraPojoInputFormat. +*/ + DataSet inputDS = env + .createInput(new CassandraPojoInputFormat<>(SELECT_QUERY, new ClusterBuilder() { Review comment: `ClusterBuilder` should also have `serialVersionUID`. It also duplicated in 2 places. We can create a variable to define it once. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219935777 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; + +/** + * Example of Cassandra Annotated POJO class for use with CassandraInputFormatter. Review comment: In doc comment: `CassandraInputFormatter` -> `{@link CassandraPojoInputFormat}` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219943237 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); + */ +public class BatchPojoExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* +* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" +*/ + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ArrayList> collection = new ArrayList<>(20); + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>(i, "string " + i)); + } + + DataSet> dataSet = env.fromCollection(collection); + + dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() { Review comment: we can remove `Tuple2` and use empty `<>` (two places) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219942318 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.mapping.Mapper; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the {@link CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API. + * + * The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; Review comment: I think there is still some mismatch between queries, doc comments, tuples in `dataSet ` and fields of `CustomCassandraAnnotatedPojo`. `testCassandraBatchPojoFormat` looks better from this side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219930899 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + + this.query = query; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { Review comment: We can remove exception declarations from all methods (also in other touched classes) where they are not actually thrown. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219934197 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; +import org.apache.flink.util.Preconditions; + +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * + * @param type of inputClass + */ +public class CassandraPojoInputFormat extends CassandraInputFormatBase { + + private static final long serialVersionUID = 1992091320180905115L; + private final MapperOptions mapperOptions; + + private transient Result resultSet; + private Class inputClass; + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass) { + this(query, builder, inputClass, null); + } + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass, MapperOptions mapperOptions) { + super(query, builder); + this.mapperOptions = mapperOptions; + Preconditions.checkArgument(inputClass != null, "InputClass cannot be null"); Review comment: `checkNotNull` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219934055 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; +import org.apache.flink.util.Preconditions; + +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * + * @param type of inputClass + */ +public class CassandraPojoInputFormat extends CassandraInputFormatBase { + + private static final long serialVersionUID = 1992091320180905115L; + private final MapperOptions mapperOptions; + + private transient Result resultSet; + private Class inputClass; Review comment: can be `final` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219940842 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ## @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception { Assert.assertTrue("The input data was not completely written to Cassandra", input.isEmpty()); } + @Test + public void testCassandraBatchPojoFormat() throws Exception { + + OutputFormat> sink = new CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); + sink.configure(new Configuration()); + sink.open(0, 1); + + for (Tuple3 value : collection) { + sink.writeRecord(value); + } + + sink.close(); + + InputFormat source = new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, CustomCassandraAnnotatedPojo.class); + source.configure(new Configuration()); + source.open(null); + + List result = new ArrayList<>(); + + while (!source.reachedEnd()) { + CustomCassandraAnnotatedPojo temp = source.nextRecord(new CustomCassandraAnnotatedPojo()); + result.add(temp); + } + + source.close(); + Assert.assertEquals(20, result.size()); Review comment: Can we also convert `collection` of tuples into list of `CustomCassandraAnnotatedPojo` and check the contents as well, additionally to size? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219929908 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); Review comment: I think we can use `checkNotNull`: ``` Preconditions.checkNotNull(builder, "Builder cannot be null"); ``` for readability This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219932299 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. Review comment: Comment: Base class for {@link RichInputFormat} to read data from Apache Cassandra. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r219929097 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java ## @@ -44,38 +31,18 @@ * * @param type of Tuple */ -public class CassandraInputFormat extends RichInputFormat implements NonParallelInput { - private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class); +public class CassandraInputFormat extends CassandraInputFormatBase { Review comment: please add `serialVersionUID` to `CassandraInputFormat` and `CassandraInputFormatBase` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once
[ https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626190#comment-16626190 ] Ufuk Celebi commented on FLINK-10292: - I understand that non-determinism may be an issue when generating the {{JobGraph}}, but do we have some data about how common that is for applications? Would it be possible to keep a fixed JobGraph in the image instead of persisting one in the {{SubmittedJobGraphStore}}? I like our current approach, because it keeps the source of truth for the job in the image instead of the {{SubmittedJobGraphStore}}. I'm wondering about the following scenario: * A user creates a job cluster with high availability enabled (cluster ID for the logical application, e.g. myapp) ** This will persist the job with a fixed ID (after FLINK-10291) on first submission * The user kills the application *without* cancelling ** This will leave all data in the high availability store(s) such as job graphs or checkpoints * The user updates the image with a modified application and keeps the high availability configuration (e.g. cluster ID stays myapp) ** This will result in the job in the image to be ignored since we already have a job graph with the same (fixed) ID I think in such a scenario it can be desirable to still have the checkpoints available, but it might be problematic if the job graph is recovered from the {{SubmittedJobGraphStore}} instead of using the job that is part of the image. What do you think about this scenario? Is it the responsibility of the user to handle this? If so, I think that the approach outlined in this ticket makes sense. If not, we may want to consider alternatives or ignore potential non-determinism. > Generate JobGraph in StandaloneJobClusterEntrypoint only once > - > > Key: FLINK-10292 > URL: https://issues.apache.org/jira/browse/FLINK-10292 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} > from the given user code every time it starts/is restarted. This can be > problematic if the the {{JobGraph}} generation has side effects. Therefore, > it would be better to generate the {{JobGraph}} only once and store it in HA > storage instead from where to retrieve. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10209) Exclude jdk.tools dependency from hadoop when running with java 9
[ https://issues.apache.org/jira/browse/FLINK-10209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626142#comment-16626142 ] ASF GitHub Bot commented on FLINK-10209: StephanEwen commented on issue #6663: [FLINK-10209][build] Exclude jdk.tools dependency from hadoop URL: https://github.com/apache/flink/pull/6663#issuecomment-424054104 This context may make my line of thinking easier to understand: The hadoop-shaded module is a convenience artifact, one where we discussed previously to phase it out eventually. It is used (1) to compile against (for HDFS / YARN / Kerberos code) and (2) to add as a jar to the lib folder. - Concerning a general exclusion tor the jdk.tools dependency: Since we don't compile Hadoop itself and we don't redistribute that dependency (it is a system dependency) I cannot see how a general exclusion would be a problem. It simplifies the build files, which is something really good. - We should encourage use of HADOOP_CLASSPATH rather than use of our Hadoop fat jar anyways. That reduces the value of the second use of the hadoop-shaded project, the packaging into the dist lib folder. If we purely go for the HADOOP_CLASSPATH variant, we could remove that project all together and simply have a provided or optional Hadoop dependency. - The fat hadoop jar is used for client side functionality only, and since version 2, Hadoop claims to have a stable setup (HDFS protocol, Kerberos config, etc.) , so we don't need each major/minor version, but one of every major version should work. We should not need the vendor specific versions either. And, there is still the HADOOP_CLASSPATH workaround in case any of the vendor-specific versions has a compatibility problem after all. - Concerning moving Hadoop to flink-shaded: We don't have to find a setup that converges across Hadoop versions, that is exactly the point. We pick some Hadoop versions for which we want to build convenience jars and converge these manually or by shading. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Exclude jdk.tools dependency from hadoop when running with java 9 > - > > Key: FLINK-10209 > URL: https://issues.apache.org/jira/browse/FLINK-10209 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{hadoop-common}} has a {{jdk.tools}} dependency which cannot be resolved on > java 9. At least for compiling we have to exclude this dependency. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6663: [FLINK-10209][build] Exclude jdk.tools dependency from hadoop
StephanEwen commented on issue #6663: [FLINK-10209][build] Exclude jdk.tools dependency from hadoop URL: https://github.com/apache/flink/pull/6663#issuecomment-424054104 This context may make my line of thinking easier to understand: The hadoop-shaded module is a convenience artifact, one where we discussed previously to phase it out eventually. It is used (1) to compile against (for HDFS / YARN / Kerberos code) and (2) to add as a jar to the lib folder. - Concerning a general exclusion tor the jdk.tools dependency: Since we don't compile Hadoop itself and we don't redistribute that dependency (it is a system dependency) I cannot see how a general exclusion would be a problem. It simplifies the build files, which is something really good. - We should encourage use of HADOOP_CLASSPATH rather than use of our Hadoop fat jar anyways. That reduces the value of the second use of the hadoop-shaded project, the packaging into the dist lib folder. If we purely go for the HADOOP_CLASSPATH variant, we could remove that project all together and simply have a provided or optional Hadoop dependency. - The fat hadoop jar is used for client side functionality only, and since version 2, Hadoop claims to have a stable setup (HDFS protocol, Kerberos config, etc.) , so we don't need each major/minor version, but one of every major version should work. We should not need the vendor specific versions either. And, there is still the HADOOP_CLASSPATH workaround in case any of the vendor-specific versions has a compatibility problem after all. - Concerning moving Hadoop to flink-shaded: We don't have to find a setup that converges across Hadoop versions, that is exactly the point. We pick some Hadoop versions for which we want to build convenience jars and converge these manually or by shading. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626107#comment-16626107 ] ASF GitHub Bot commented on FLINK-10295: azagrebin commented on issue #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#issuecomment-424045988 cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on issue #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
azagrebin commented on issue #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#issuecomment-424045988 cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626106#comment-16626106 ] ASF GitHub Bot commented on FLINK-10295: azagrebin opened a new pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754 ## What is the purpose of the change This PR add one more option to specify jar program arguments in REST API calls (run and get plan). Users can specify arguments as one string or additionally mow as a list of a separate strings. ## Brief change log - add JarRequestBody as a base for JarRunRequestBody and JarPlanRequestBody - add programArgsList to JarRequestBody - add JarHandlerUtils.getProgramArgs to resolve program args from programArgs and programArgsList from query path or json body in JarRunHandler or JarPlanHandler - user can use programArgs or programArgsList but not both, add test for it ## Verifying this change Unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10295: --- Labels: pull-request-available (was: ) > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin opened a new pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
azagrebin opened a new pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754 ## What is the purpose of the change This PR add one more option to specify jar program arguments in REST API calls (run and get plan). Users can specify arguments as one string or additionally mow as a list of a separate strings. ## Brief change log - add JarRequestBody as a base for JarRunRequestBody and JarPlanRequestBody - add programArgsList to JarRequestBody - add JarHandlerUtils.getProgramArgs to resolve program args from programArgs and programArgsList from query path or json body in JarRunHandler or JarPlanHandler - user can use programArgs or programArgsList but not both, add test for it ## Verifying this change Unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10251) Handle oversized response messages in AkkaRpcActor
[ https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] tison reassigned FLINK-10251: - Assignee: (was: tison) > Handle oversized response messages in AkkaRpcActor > -- > > Key: FLINK-10251 > URL: https://issues.apache.org/jira/browse/FLINK-10251 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > The {{AkkaRpcActor}} should check whether an RPC response which is sent to a > remote sender does not exceed the maximum framesize of the underlying > {{ActorSystem}}. If this is the case we should fail fast instead. We can > achieve this by serializing the response and sending the serialized byte > array. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-1960) Add comments and docs for withForwardedFields and related operators
[ https://issues.apache.org/jira/browse/FLINK-1960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626069#comment-16626069 ] ASF GitHub Bot commented on FLINK-1960: --- dmpalyvos opened a new pull request #6753: [FLINK-1960] [Documentation] Add docs for withForwardedFields and related operators in Scala API URL: https://github.com/apache/flink/pull/6753 ## What is the purpose of the change The withForwardedFields and related operators have no docs in the Scala API. This pull request adds documentation (javadoc) for them. ## Brief change log *Added documentation for the `withForwardedFields*` Scala functions based on the javadoc of the related java functions*: - *Added documentation to withForwardedFields* - *Added documentation to withForwardedFieldsFirst* - *Added documentation to withForwardedFieldsSecond* ## Verifying this change This change is a trivial documentation update without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? - This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add comments and docs for withForwardedFields and related operators > --- > > Key: FLINK-1960 > URL: https://issues.apache.org/jira/browse/FLINK-1960 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Theodore Vasiloudis >Assignee: hzhuangzhenxi >Priority: Minor > Labels: documentation, pull-request-available, starter > > The withForwardedFields and related operators have no docs for the Scala API. > It would be useful to have code comments and example usage in the docs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-1960) Add comments and docs for withForwardedFields and related operators
[ https://issues.apache.org/jira/browse/FLINK-1960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-1960: -- Labels: documentation pull-request-available starter (was: documentation starter) > Add comments and docs for withForwardedFields and related operators > --- > > Key: FLINK-1960 > URL: https://issues.apache.org/jira/browse/FLINK-1960 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Theodore Vasiloudis >Assignee: hzhuangzhenxi >Priority: Minor > Labels: documentation, pull-request-available, starter > > The withForwardedFields and related operators have no docs for the Scala API. > It would be useful to have code comments and example usage in the docs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dmpalyvos opened a new pull request #6753: [FLINK-1960] [Documentation] Add docs for withForwardedFields and related operators in Scala API
dmpalyvos opened a new pull request #6753: [FLINK-1960] [Documentation] Add docs for withForwardedFields and related operators in Scala API URL: https://github.com/apache/flink/pull/6753 ## What is the purpose of the change The withForwardedFields and related operators have no docs in the Scala API. This pull request adds documentation (javadoc) for them. ## Brief change log *Added documentation for the `withForwardedFields*` Scala functions based on the javadoc of the related java functions*: - *Added documentation to withForwardedFields* - *Added documentation to withForwardedFieldsFirst* - *Added documentation to withForwardedFieldsSecond* ## Verifying this change This change is a trivial documentation update without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? - This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10396) Remove codebase switch from MiniClusterResource
[ https://issues.apache.org/jira/browse/FLINK-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626038#comment-16626038 ] ASF GitHub Bot commented on FLINK-10396: zentol commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType URL: https://github.com/apache/flink/pull/6748#discussion_r219883500 ## File path: flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ## @@ -322,32 +320,20 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { -val isNew = TestBaseUtils.isNewCodebase() -if (isNew) { - configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) - // set to different than default so not to interfere with ScalaShellLocalStartupITCase - configuration.setInteger(RestOptions.PORT, 8082) - val miniConfig = new MiniClusterConfiguration.Builder() -.setConfiguration(configuration) -.setNumSlotsPerTaskManager(parallelism) -.build() - - val miniCluster = new MiniCluster(miniConfig) - miniCluster.start() - port = miniCluster.getRestAddress.getPort - hostname = miniCluster.getRestAddress.getHost - - cluster = Some(Left(miniCluster)) -} else { - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE) - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism) - val standaloneCluster = new StandaloneMiniCluster(configuration) - - hostname = standaloneCluster.getHostname - port = standaloneCluster.getPort - - cluster = Some(Right(standaloneCluster)) -} +configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) +// set to different than default so not to interfere with ScalaShellLocalStartupITCase +configuration.setInteger(RestOptions.PORT, 8082) +val miniConfig = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(parallelism) + .build() + +val miniCluster = new MiniCluster(miniConfig) +miniCluster.start() +port = miniCluster.getRestAddress.getPort +hostname = miniCluster.getRestAddress.getHost + +cluster = Some(Left(miniCluster)) Review comment: do we still need the `Either` here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove codebase switch from MiniClusterResource > --- > > Key: FLINK-10396 > URL: https://issues.apache.org/jira/browse/FLINK-10396 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: > 0001-FLINK-10396-Remove-codebase-switch-in-UT-IT-tests.patch > > > Remove the legacy codebase switch from {{MiniClusterResource}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType
zentol commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType URL: https://github.com/apache/flink/pull/6748#discussion_r219883500 ## File path: flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ## @@ -322,32 +320,20 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { -val isNew = TestBaseUtils.isNewCodebase() -if (isNew) { - configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) - // set to different than default so not to interfere with ScalaShellLocalStartupITCase - configuration.setInteger(RestOptions.PORT, 8082) - val miniConfig = new MiniClusterConfiguration.Builder() -.setConfiguration(configuration) -.setNumSlotsPerTaskManager(parallelism) -.build() - - val miniCluster = new MiniCluster(miniConfig) - miniCluster.start() - port = miniCluster.getRestAddress.getPort - hostname = miniCluster.getRestAddress.getHost - - cluster = Some(Left(miniCluster)) -} else { - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE) - configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism) - val standaloneCluster = new StandaloneMiniCluster(configuration) - - hostname = standaloneCluster.getHostname - port = standaloneCluster.getPort - - cluster = Some(Right(standaloneCluster)) -} +configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) +// set to different than default so not to interfere with ScalaShellLocalStartupITCase +configuration.setInteger(RestOptions.PORT, 8082) +val miniConfig = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumSlotsPerTaskManager(parallelism) + .build() + +val miniCluster = new MiniCluster(miniConfig) +miniCluster.start() +port = miniCluster.getRestAddress.getPort +hostname = miniCluster.getRestAddress.getHost + +cluster = Some(Left(miniCluster)) Review comment: do we still need the `Either` here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10397) Remove CoreOptions#MODE
[ https://issues.apache.org/jira/browse/FLINK-10397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626034#comment-16626034 ] ASF GitHub Bot commented on FLINK-10397: tillrohrmann opened a new pull request #6752: [FLINK-10397] Remove CoreOptions#MODE URL: https://github.com/apache/flink/pull/6752 ## What is the purpose of the change This PR is based on #6748 and #6751 and is part of the [removal of Flink's legacy mode](https://issues.apache.org/jira/browse/FLINK-10392). Removes the CoreOptions#MODE option used to switch between the new and legacy mode. ## Brief change log - Remove `CoreOptions#MODE`, `CoreOptions#NEW` and `CoreOptions#LEGACY` - Set the mode where it was switchable to new (e.g. `isNewMode = true` in `YarnTestBase`) ## Verifying this change - Not explicitly tested ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove CoreOptions#MODE > --- > > Key: FLINK-10397 > URL: https://issues.apache.org/jira/browse/FLINK-10397 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Remove the {{CoreOptions#MODE}} since it is no longer needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10397) Remove CoreOptions#MODE
[ https://issues.apache.org/jira/browse/FLINK-10397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10397: --- Labels: pull-request-available (was: ) > Remove CoreOptions#MODE > --- > > Key: FLINK-10397 > URL: https://issues.apache.org/jira/browse/FLINK-10397 > Project: Flink > Issue Type: Sub-task > Components: Configuration >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Remove the {{CoreOptions#MODE}} since it is no longer needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #6752: [FLINK-10397] Remove CoreOptions#MODE
tillrohrmann opened a new pull request #6752: [FLINK-10397] Remove CoreOptions#MODE URL: https://github.com/apache/flink/pull/6752 ## What is the purpose of the change This PR is based on #6748 and #6751 and is part of the [removal of Flink's legacy mode](https://issues.apache.org/jira/browse/FLINK-10392). Removes the CoreOptions#MODE option used to switch between the new and legacy mode. ## Brief change log - Remove `CoreOptions#MODE`, `CoreOptions#NEW` and `CoreOptions#LEGACY` - Set the mode where it was switchable to new (e.g. `isNewMode = true` in `YarnTestBase`) ## Verifying this change - Not explicitly tested ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services