[jira] [Commented] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread

2018-09-24 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626789#comment-16626789
 ] 

tison commented on FLINK-10413:
---

This might be a duplicate of FLINK-10319

> requestPartitionState messages overwhelms JM RPC main thread
> 
>
> Key: FLINK-10413
> URL: https://issues.apache.org/jira/browse/FLINK-10413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>
> We tried to benchmark the job scheduling performance with a 2000x2000 
> ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks 
> finishes soon after started.
> In this case we see slow RPC responses and TM/RM heartbeats to JM will 
> finally timeout.
> We find ~2,000,000 requestPartitionState messages triggered by 
> triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC 
> main thread. This is due to downstream tasks can be started earlier than 
> upstream tasks in EAGER scheduling.
>  
> We's suggest no partition producer state check to avoid this issue. The task 
> can just keep waiting for a while and retrying if the partition does not 
> exist. There are two cases when the partition does not exist:
>  # the partition is not started yet
>  # the partition is failed
> In case 1, retry works. In case 2, a task failover will soon happen and 
> cancel the downstream tasks as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread

2018-09-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10413:


Assignee: vinoyang

> requestPartitionState messages overwhelms JM RPC main thread
> 
>
> Key: FLINK-10413
> URL: https://issues.apache.org/jira/browse/FLINK-10413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>
> We tried to benchmark the job scheduling performance with a 2000x2000 
> ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks 
> finishes soon after started.
> In this case we see slow RPC responses and TM/RM heartbeats to JM will 
> finally timeout.
> We find ~2,000,000 requestPartitionState messages triggered by 
> triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC 
> main thread. This is due to downstream tasks can be started earlier than 
> upstream tasks in EAGER scheduling.
>  
> We's suggest no partition producer state check to avoid this issue. The task 
> can just keep waiting for a while and retrying if the partition does not 
> exist. There are two cases when the partition does not exist:
>  # the partition is not started yet
>  # the partition is failed
> In case 1, retry works. In case 2, a task failover will soon happen and 
> cancel the downstream tasks as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626767#comment-16626767
 ] 

ASF GitHub Bot commented on FLINK-10412:


yanghua opened a new pull request #6755: [FLINK-10412] toString field in 
AbstractID should be transient to avoid been serialized
URL: https://github.com/apache/flink/pull/6755
 
 
   ## What is the purpose of the change
   
   *This pull request mark toString field in AbstractID as transient to avoid 
been serialized *
   
   
   ## Brief change log
   
 - *Mark toString field in AbstractID as transient to avoid been serialized*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>  Labels: deploy,deployment, pull-request-available, serialization
>
> The toString field in AbstractID will be serialized currently, which makes 
> RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo 
> larger (50%+).
> It adds more pressure to JM memory especially in large scale job scheduling 
> (1x1 ALL-to-ALL connection).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10412:
---
Labels: deploy,deployment pull-request-available serialization  (was: 
deploy,deployment serialization)

> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>  Labels: deploy,deployment, pull-request-available, serialization
>
> The toString field in AbstractID will be serialized currently, which makes 
> RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo 
> larger (50%+).
> It adds more pressure to JM memory especially in large scale job scheduling 
> (1x1 ALL-to-ALL connection).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #6755: [FLINK-10412] toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread GitBox
yanghua opened a new pull request #6755: [FLINK-10412] toString field in 
AbstractID should be transient to avoid been serialized
URL: https://github.com/apache/flink/pull/6755
 
 
   ## What is the purpose of the change
   
   *This pull request mark toString field in AbstractID as transient to avoid 
been serialized *
   
   
   ## Brief change log
   
 - *Mark toString field in AbstractID as transient to avoid been serialized*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626745#comment-16626745
 ] 

ASF GitHub Bot commented on FLINK-10319:


TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-421222747
 
 
   cc @tillrohrmann @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Too many requestPartitionState would crash JM
> -
>
> Key: FLINK-10319
> URL: https://issues.apache.org/jira/browse/FLINK-10319
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Do not requestPartitionState from JM on partition request fail, which may 
> generate too many RPC requests and block JM.
> We gain little benefit to check what state producer is in, which in the other 
> hand crash JM by too many RPC requests. Task could always 
> retriggerPartitionRequest from its InputGate, it would be fail if the 
> producer has gone and succeed if the producer alive. Anyway, no need to ask 
> for JM for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM

2018-09-24 Thread GitBox
TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-421222747
 
 
   cc @tillrohrmann @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626724#comment-16626724
 ] 

ASF GitHub Bot commented on FLINK-10319:


TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-422276930
 
 
   cc @tillrohrmann @GJL @StefanRRichter 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Too many requestPartitionState would crash JM
> -
>
> Key: FLINK-10319
> URL: https://issues.apache.org/jira/browse/FLINK-10319
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Do not requestPartitionState from JM on partition request fail, which may 
> generate too many RPC requests and block JM.
> We gain little benefit to check what state producer is in, which in the other 
> hand crash JM by too many RPC requests. Task could always 
> retriggerPartitionRequest from its InputGate, it would be fail if the 
> producer has gone and succeed if the producer alive. Anyway, no need to ask 
> for JM for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10319) Too many requestPartitionState would crash JM

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626721#comment-16626721
 ] 

ASF GitHub Bot commented on FLINK-10319:


TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-421882434
 
 
   cc @StephanEwen @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Too many requestPartitionState would crash JM
> -
>
> Key: FLINK-10319
> URL: https://issues.apache.org/jira/browse/FLINK-10319
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Do not requestPartitionState from JM on partition request fail, which may 
> generate too many RPC requests and block JM.
> We gain little benefit to check what state producer is in, which in the other 
> hand crash JM by too many RPC requests. Task could always 
> retriggerPartitionRequest from its InputGate, it would be fail if the 
> producer has gone and succeed if the producer alive. Anyway, no need to ask 
> for JM for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM

2018-09-24 Thread GitBox
TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-422276930
 
 
   cc @tillrohrmann @GJL @StefanRRichter 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many requestPartitionState would crash JM

2018-09-24 Thread GitBox
TisonKun removed a comment on issue #6680: [FLINK-10319] [runtime] Too many 
requestPartitionState would crash JM
URL: https://github.com/apache/flink/pull/6680#issuecomment-421882434
 
 
   cc @StephanEwen @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread

2018-09-24 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-10413:

Description: 
We tried to benchmark the job scheduling performance with a 2000x2000 
ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks finishes 
soon after started.

In this case we see slow RPC responses and TM/RM heartbeats to JM will finally 
timeout.

We find ~2,000,000 requestPartitionState messages triggered by 
triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC 
main thread. This is due to downstream tasks can be started earlier than 
upstream tasks in EAGER scheduling.

 

We's suggest no partition producer state check to avoid this issue. The task 
can just keep waiting for a while and retrying if the partition does not exist. 
There are two cases when the partition does not exist:
 # the partition is not started yet
 # the partition is failed

In case 1, retry works. In case 2, a task failover will soon happen and cancel 
the downstream tasks as well.

  was:
We tried to benchmark the job scheduling performance with a 2000x2000 
ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks finishes 
soon after started.

 

In this case we see slow RPC responses and TM/RM heartbeats to JM will finally 
timeout.

We find ~2,000,000 requestPartitionState messages triggered by 

triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC 
main thread. This is due to downstream tasks can be started earlier than 
upstream tasks in EAGER scheduling.

 

We's suggest no partition producer state check to avoid this issue. The task 
can just keep waiting for a while and retrying if the partition does not exist. 
There are two cases when the partition does not exist:
 # the partition is not started yet
 # the partition is failed

In case 1, retry works. In case 2, a task failover will soon happen and cancel 
the downstream tasks as well.


> requestPartitionState messages overwhelms JM RPC main thread
> 
>
> Key: FLINK-10413
> URL: https://issues.apache.org/jira/browse/FLINK-10413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Priority: Major
>
> We tried to benchmark the job scheduling performance with a 2000x2000 
> ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks 
> finishes soon after started.
> In this case we see slow RPC responses and TM/RM heartbeats to JM will 
> finally timeout.
> We find ~2,000,000 requestPartitionState messages triggered by 
> triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC 
> main thread. This is due to downstream tasks can be started earlier than 
> upstream tasks in EAGER scheduling.
>  
> We's suggest no partition producer state check to avoid this issue. The task 
> can just keep waiting for a while and retrying if the partition does not 
> exist. There are two cases when the partition does not exist:
>  # the partition is not started yet
>  # the partition is failed
> In case 1, retry works. In case 2, a task failover will soon happen and 
> cancel the downstream tasks as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread

2018-09-24 Thread Zhu Zhu (JIRA)
Zhu Zhu created FLINK-10413:
---

 Summary: requestPartitionState messages overwhelms JM RPC main 
thread
 Key: FLINK-10413
 URL: https://issues.apache.org/jira/browse/FLINK-10413
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.7.0
Reporter: Zhu Zhu


We tried to benchmark the job scheduling performance with a 2000x2000 
ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks finishes 
soon after started.

 

In this case we see slow RPC responses and TM/RM heartbeats to JM will finally 
timeout.

We find ~2,000,000 requestPartitionState messages triggered by 

triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC 
main thread. This is due to downstream tasks can be started earlier than 
upstream tasks in EAGER scheduling.

 

We's suggest no partition producer state check to avoid this issue. The task 
can just keep waiting for a while and retrying if the partition does not exist. 
There are two cases when the partition does not exist:
 # the partition is not started yet
 # the partition is failed

In case 1, retry works. In case 2, a task failover will soon happen and cancel 
the downstream tasks as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626712#comment-16626712
 ] 

vinoyang commented on FLINK-10412:
--

[~zhuzh] Good catch! Will fix it soon.

> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>  Labels: deploy,deployment, serialization
>
> The toString field in AbstractID will be serialized currently, which makes 
> RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo 
> larger (50%+).
> It adds more pressure to JM memory especially in large scale job scheduling 
> (1x1 ALL-to-ALL connection).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10412:


Assignee: vinoyang

> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>  Labels: deploy,deployment, serialization
>
> The toString field in AbstractID will be serialized currently, which makes 
> RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo 
> larger (50%+).
> It adds more pressure to JM memory especially in large scale job scheduling 
> (1x1 ALL-to-ALL connection).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-10412:

Description: 
The toString field in AbstractID will be serialized currently, which makes RPC 
messages body like InputChannelDeploymentDescriptor and PartitionInfo larger 
(50%+).

It adds more pressure to JM memory especially in large scale job scheduling 
(1x1 ALL-to-ALL connection).

  was:The toString field in A


> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: deploy,deployment, serialization
>
> The toString field in AbstractID will be serialized currently, which makes 
> RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo 
> larger (50%+).
> It adds more pressure to JM memory especially in large scale job scheduling 
> (1x1 ALL-to-ALL connection).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-10412:

Description: The toString field in A

> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: deploy,deployment, serialization
>
> The toString field in A



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-10412:

Labels: deploy,deployment serialization  (was: )

> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: deploy,deployment, serialization
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-10412:

Component/s: Distributed Coordination

> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-10412:

Affects Version/s: 1.7.0

> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-24 Thread Zhu Zhu (JIRA)
Zhu Zhu created FLINK-10412:
---

 Summary: toString field in AbstractID should be transient to avoid 
been serialized
 Key: FLINK-10412
 URL: https://issues.apache.org/jira/browse/FLINK-10412
 Project: Flink
  Issue Type: Bug
Reporter: Zhu Zhu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626693#comment-16626693
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars edited a comment on issue #6710: [FLINK-10134] UTF-16 support 
for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954
 
 
   > I think we need to revisit the way this fix works. At this point, it 
probably makes sense to close the PR and go back to a design discussion first. 
I would suggest to have that design discussion on the JIRA issue first.
   > 
   > (1) Where should the BOM be read? Beginning of the file only? Then it 
should most likely be part of split generation.
   > 
   > (2) Any design cannot have additional logic in the method that is called 
per record, otherwise it will have too much performance impact.
   
   @StephanEwen Thank you for your suggestion. I close the PR first. Then 
discuss how to fix this bug.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Blocker
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> misreading the code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on 
> all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] XuQianJin-Stars edited a comment on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-09-24 Thread GitBox
XuQianJin-Stars edited a comment on issue #6710: [FLINK-10134] UTF-16 support 
for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954
 
 
   > I think we need to revisit the way this fix works. At this point, it 
probably makes sense to close the PR and go back to a design discussion first. 
I would suggest to have that design discussion on the JIRA issue first.
   > 
   > (1) Where should the BOM be read? Beginning of the file only? Then it 
should most likely be part of split generation.
   > 
   > (2) Any design cannot have additional logic in the method that is called 
per record, otherwise it will have too much performance impact.
   
   @StephanEwen Thank you for your suggestion. I close the PR first. Then 
discuss how to fix this bug.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626690#comment-16626690
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for 
TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index c1ef344175b..8eb43424264 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -62,6 +62,9 @@
// Charset is not serializable
private transient Charset charset;
 
+   /** The charset of bom in the file to process. */
+   private String bomCharsetName;
+
/**
 * The default read buffer size = 1MB.
 */
@@ -221,6 +224,11 @@ public void setCharset(String charset) {
}
}
 
+   @PublicEvolving
+   public String getBomCharsetName() {
+   return this.bomCharsetName;
+   }
+
public byte[] getDelimiter() {
return delimiter;
}
@@ -341,7 +349,7 @@ public void configure(Configuration parameters) {
@Override
public FileBaseStatistics getStatistics(BaseStatistics cachedStats) 
throws IOException {

-   final FileBaseStatistics cachedFileStats = cachedStats 
instanceof FileBaseStatistics ?
+   final FileBaseStatistics cachedFileStats = cachedStats 
instanceof FileInputFormat.FileBaseStatistics ?
(FileBaseStatistics) cachedStats : null;

// store properties
@@ -408,7 +416,9 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
while (samplesTaken < numSamples && fileNum < 
allFiles.size()) {
// make a split for the sample and use it to 
read a record
FileStatus file = allFiles.get(fileNum);
-   FileInputSplit split = new FileInputSplit(0, 
file.getPath(), offset, file.getLen() - offset, null);
+   String bomCharsetName = getBomCharset(file);
+
+   FileInputSplit split = new FileInputSplit(0, 
file.getPath(), offset, file.getLen() - offset, null, bomCharsetName);
 
// we open the split, read one line, and take 
its length
try {
@@ -467,6 +477,7 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
 */
@Override
public void open(FileInputSplit split) throws IOException {
+   this.bomCharsetName = split.getBomCharsetName();
super.open(split);
initBuffers();
 
@@ -736,7 +747,7 @@ public void reopen(FileInputSplit split, Long state) throws 
IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
Preconditions.checkArgument(state == -1 || state >= 
split.getStart(),
-   " Illegal offset "+ state +", smaller than the splits 
start=" + split.getStart());
+   " Illegal offset " + state + ", smaller than the splits 
start=" + split.getStart());
 
try {
this.open(split);
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 14cf647cd24..c58344fba6c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -34,7 +34,7 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,20 +55,20 @@
  * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be 
implemented.
  * Additionally, one may override {@link #open(FileInputSplit)} and {@link 
#close()} to
  * change the life cycle behavior.
- * 
+ *
  * After the {@link 

[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626689#comment-16626689
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for 
TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954
 
 
   > I think we need to revisit the way this fix works. At this point, it 
probably makes sense to close the PR and go back to a design discussion first. 
I would suggest to have that design discussion on the JIRA issue first.
   > 
   > (1) Where should the BOM be read? Beginning of the file only? Then it 
should most likely be part of split generation.
   > 
   > (2) Any design cannot have additional logic in the method that is called 
per record, otherwise it will have too much performance impact.
   
   @StephanEwen Thank you for your suggestion. I close the PR frist. Then 
discuss how to fix this bug.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Blocker
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> misreading the code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on 
> all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-09-24 Thread GitBox
XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for 
TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954
 
 
   > I think we need to revisit the way this fix works. At this point, it 
probably makes sense to close the PR and go back to a design discussion first. 
I would suggest to have that design discussion on the JIRA issue first.
   > 
   > (1) Where should the BOM be read? Beginning of the file only? Then it 
should most likely be part of split generation.
   > 
   > (2) Any design cannot have additional logic in the method that is called 
per record, otherwise it will have too much performance impact.
   
   @StephanEwen Thank you for your suggestion. I close the PR frist. Then 
discuss how to fix this bug.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-09-24 Thread GitBox
XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for 
TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index c1ef344175b..8eb43424264 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -62,6 +62,9 @@
// Charset is not serializable
private transient Charset charset;
 
+   /** The charset of bom in the file to process. */
+   private String bomCharsetName;
+
/**
 * The default read buffer size = 1MB.
 */
@@ -221,6 +224,11 @@ public void setCharset(String charset) {
}
}
 
+   @PublicEvolving
+   public String getBomCharsetName() {
+   return this.bomCharsetName;
+   }
+
public byte[] getDelimiter() {
return delimiter;
}
@@ -341,7 +349,7 @@ public void configure(Configuration parameters) {
@Override
public FileBaseStatistics getStatistics(BaseStatistics cachedStats) 
throws IOException {

-   final FileBaseStatistics cachedFileStats = cachedStats 
instanceof FileBaseStatistics ?
+   final FileBaseStatistics cachedFileStats = cachedStats 
instanceof FileInputFormat.FileBaseStatistics ?
(FileBaseStatistics) cachedStats : null;

// store properties
@@ -408,7 +416,9 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
while (samplesTaken < numSamples && fileNum < 
allFiles.size()) {
// make a split for the sample and use it to 
read a record
FileStatus file = allFiles.get(fileNum);
-   FileInputSplit split = new FileInputSplit(0, 
file.getPath(), offset, file.getLen() - offset, null);
+   String bomCharsetName = getBomCharset(file);
+
+   FileInputSplit split = new FileInputSplit(0, 
file.getPath(), offset, file.getLen() - offset, null, bomCharsetName);
 
// we open the split, read one line, and take 
its length
try {
@@ -467,6 +477,7 @@ public FileBaseStatistics getStatistics(BaseStatistics 
cachedStats) throws IOExc
 */
@Override
public void open(FileInputSplit split) throws IOException {
+   this.bomCharsetName = split.getBomCharsetName();
super.open(split);
initBuffers();
 
@@ -736,7 +747,7 @@ public void reopen(FileInputSplit split, Long state) throws 
IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
Preconditions.checkArgument(state == -1 || state >= 
split.getStart(),
-   " Illegal offset "+ state +", smaller than the splits 
start=" + split.getStart());
+   " Illegal offset " + state + ", smaller than the splits 
start=" + split.getStart());
 
try {
this.open(split);
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 14cf647cd24..c58344fba6c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -34,7 +34,7 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,20 +55,20 @@
  * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be 
implemented.
  * Additionally, one may override {@link #open(FileInputSplit)} and {@link 
#close()} to
  * change the life cycle behavior.
- * 
+ *
  * After the {@link #open(FileInputSplit)} method completed, the file input 
data is available
  * from the {@link #stream} field.
  */
 @Public
 public abstract class FileInputFormat extends RichInputFormat {
-   
+
// 

[GitHub] yanghua commented on issue #6738: [FLINK-10126] There should be a Scala DataSource

2018-09-24 Thread GitBox
yanghua commented on issue #6738: [FLINK-10126] There should be a Scala 
DataSource
URL: https://github.com/apache/flink/pull/6738#issuecomment-424184780
 
 
   hi @StephanEwen , Before providing the API directly, I evaluated the 
implementation of a peer `DataSource` in the flink-scala module. However, It 
will break the existing source API. It was found that 
`ExecutionEnvironment.scala` released some `readXXX` APIs that returned 
`DataSet.scala`. Inside these APIs, we got a `DataSource` (flink-java) and then 
called `wrap(source)` to convert it to a `DataSet.scala`. Considering that 
there are only two or three APIs exposed in the `DataSource.java` (one of which 
`withParameters` has been exposed through the `DataSet`), I chose to expose the 
API in the `DataSource` to the `DataSet.scala` with minimal changes and without 
breaking the current API. In fact, in this JIRA's description, reporter is more 
concerned with getting the APIs provided by `DataSource.java`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10126) There should be a Scala DataSource

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626676#comment-16626676
 ] 

ASF GitHub Bot commented on FLINK-10126:


yanghua commented on issue #6738: [FLINK-10126] There should be a Scala 
DataSource
URL: https://github.com/apache/flink/pull/6738#issuecomment-424184780
 
 
   hi @StephanEwen , Before providing the API directly, I evaluated the 
implementation of a peer `DataSource` in the flink-scala module. However, It 
will break the existing source API. It was found that 
`ExecutionEnvironment.scala` released some `readXXX` APIs that returned 
`DataSet.scala`. Inside these APIs, we got a `DataSource` (flink-java) and then 
called `wrap(source)` to convert it to a `DataSet.scala`. Considering that 
there are only two or three APIs exposed in the `DataSource.java` (one of which 
`withParameters` has been exposed through the `DataSet`), I chose to expose the 
API in the `DataSource` to the `DataSet.scala` with minimal changes and without 
breaking the current API. In fact, in this JIRA's description, reporter is more 
concerned with getting the APIs provided by `DataSource.java`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> There should be a Scala DataSource
> --
>
> Key: FLINK-10126
> URL: https://issues.apache.org/jira/browse/FLINK-10126
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexis Sarda-Espinosa
>Assignee: vinoyang
>Priority: Minor
>  Labels: datasource, pull-request-available, scala
>
> In Java, an ExecutionEnvironment's createInput method returns a DataSource, 
> whereas the Scala version returns a DataSet. There is no Scala DataSource 
> wrapper, and the Scala DataSet does not provide the Java DataSource methods, 
> such as getSplitDataProperties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10065) InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626673#comment-16626673
 ] 

ASF GitHub Bot commented on FLINK-10065:


klion26 commented on issue #6498: [FLINK-10065] 
InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean 
isFailureTolerant) will close the inputStream
URL: https://github.com/apache/flink/pull/6498#issuecomment-424183722
 
 
   @tillrohrmann , @StefanRRichter , @tzulitai thank you all for replying, is 
there anything I need to do before continuing?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean 
> isFailureTolerant) will close the inputStream
> -
>
> Key: FLINK-10065
> URL: https://issues.apache.org/jira/browse/FLINK-10065
> Project: Flink
>  Issue Type: Bug
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> Now, the implementation of InstantiationUtil.deserializeObject(InputStream 
> in, ClassLoader cl, boolean isFailureTolerant) is 
> {code:java}
> @SuppressWarnings("unchecked")
> public static  T deserializeObject(InputStream in, ClassLoader cl, boolean 
> isFailureTolerant)
> throws IOException, ClassNotFoundException {
> final ClassLoader old = Thread.currentThread().getContextClassLoader();
> // not using resource try to avoid AutoClosable's close() on the given stream
> try (ObjectInputStream oois = isFailureTolerant
> ? new InstantiationUtil.FailureTolerantObjectInputStream(in, cl)
> : new InstantiationUtil.ClassLoaderObjectInputStream(in, cl)) {
> Thread.currentThread().setContextClassLoader(cl);
> return (T) oois.readObject();
> }
> finally {
> Thread.currentThread().setContextClassLoader(old);
> }
> }
> {code}
> InputStream is closable, so the parameter will be closed after call this 
> method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on issue #6498: [FLINK-10065] InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean isFailureTolerant) will close the inputStream

2018-09-24 Thread GitBox
klion26 commented on issue #6498: [FLINK-10065] 
InstantiationUtil.deserializeObject(InputStream in, ClassLoader cl, boolean 
isFailureTolerant) will close the inputStream
URL: https://github.com/apache/flink/pull/6498#issuecomment-424183722
 
 
   @tillrohrmann , @StefanRRichter , @tzulitai thank you all for replying, is 
there anything I need to do before continuing?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10399) Refractor ParameterTool#fromArgs

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626599#comment-16626599
 ] 

ASF GitHub Bot commented on FLINK-10399:


TisonKun commented on issue #6737: [FLINK-10399] Refractor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#issuecomment-424165441
 
 
   @StephanEwen Thanks for your review!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refractor ParameterTool#fromArgs
> 
>
> Key: FLINK-10399
> URL: https://issues.apache.org/jira/browse/FLINK-10399
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{ParameterTool#fromArgs}} uses a weird implement which flink developer would 
> fail to parse it fast.
> The main problem is that, when parse args, we always try to get a key-value 
> pair, but the implement iterate by a {{for}} loop, thus introduce weird 
> flag/mutable variable and branches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs

2018-09-24 Thread GitBox
TisonKun commented on issue #6737: [FLINK-10399] Refractor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#issuecomment-424165441
 
 
   @StephanEwen Thanks for your review!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10289) Classify Exceptions to different category for apply different failover strategy

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626469#comment-16626469
 ] 

ASF GitHub Bot commented on FLINK-10289:


isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify 
Exceptions to different category for apply different failover strategy
URL: https://github.com/apache/flink/pull/6739#issuecomment-424127134
 
 
   > I really like this idea in principle.
   > 
   > Instead of attaching an enum classifier, could we solve this by exception 
inheritance? Having exceptions that represent the categories and then making 
the specific exceptions subclasses of the category exceptions?
   
   Thanks Stephan, use exception inheritance certainly a good way, however:
   - enum explicitly tell that there are four types of exceptions, while 
exception is more scattered. 
   - Java doesn't support multi-inherits that will make subclasses a little 
harder but we can also use Java interface or Java annotation 
   - There is a high level class need to consume this classification logic, so 
the ThrowableClassifier here is simply the logic, eg: if we use annotation, we 
need a logic to extract the annotation.
   
   so i would propose to mix those ideas together, keep the classify logic and 
use Java annotation/interface, @StephanEwen let me know what you think.
   
   Jin


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Classify Exceptions to different category for apply different failover 
> strategy
> ---
>
> Key: FLINK-10289
> URL: https://issues.apache.org/jira/browse/FLINK-10289
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>
> We need to classify exceptions and treat them with different strategies. To 
> do this, we propose to introduce the following Throwable Types, and the 
> corresponding exceptions:
>  * NonRecoverable
>  ** We shouldn’t retry if an exception was classified as NonRecoverable
>  ** For example, NoResouceAvailiableException is a NonRecoverable Exception
>  ** Introduce a new Exception UserCodeException to wrap all exceptions that 
> throw from user code
>  * PartitionDataMissingError
>  ** In certain scenarios producer data was transferred in blocking mode or 
> data was saved in persistent store. If the partition was missing, we need to 
> revoke/rerun the produce task to regenerate the data.
>  ** Introduce a new exception PartitionDataMissingException to wrap all those 
> kinds of issues.
>  * EnvironmentError
>  ** It happened due to hardware, or software issues that were related to 
> specific environments. The assumption is that a task will succeed if we run 
> it in a different environment, and other task run in this bad environment 
> will very likely fail. If multiple task failures in the same machine due to 
> EnvironmentError, we need to consider adding the bad machine to blacklist, 
> and avoiding schedule task on it.
>  ** Introduce a new exception EnvironmentException to wrap all those kind of 
> issues.
>  * Recoverable
>  ** We assume other issues are recoverable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify Exceptions to different category for apply different failover strategy

2018-09-24 Thread GitBox
isunjin commented on issue #6739: [FLINK-10289] [JobManager] Classify 
Exceptions to different category for apply different failover strategy
URL: https://github.com/apache/flink/pull/6739#issuecomment-424127134
 
 
   > I really like this idea in principle.
   > 
   > Instead of attaching an enum classifier, could we solve this by exception 
inheritance? Having exceptions that represent the categories and then making 
the specific exceptions subclasses of the category exceptions?
   
   Thanks Stephan, use exception inheritance certainly a good way, however:
   - enum explicitly tell that there are four types of exceptions, while 
exception is more scattered. 
   - Java doesn't support multi-inherits that will make subclasses a little 
harder but we can also use Java interface or Java annotation 
   - There is a high level class need to consume this classification logic, so 
the ThrowableClassifier here is simply the logic, eg: if we use annotation, we 
need a logic to extract the annotation.
   
   so i would propose to mix those ideas together, keep the classify logic and 
use Java annotation/interface, @StephanEwen let me know what you think.
   
   Jin


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626449#comment-16626449
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

bmeriaux commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219984964
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   OutputFormat> sink = new 
CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   sink.configure(new Configuration());
+   sink.open(0, 1);
+
+   for (Tuple3 value : collection) {
+   sink.writeRecord(value);
+   }
+
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, 
CustomCassandraAnnotatedPojo.class);
+   source.configure(new Configuration());
+   source.open(null);
+
+   List result = new ArrayList<>();
+
+   while (!source.reachedEnd()) {
+   CustomCassandraAnnotatedPojo temp = 
source.nextRecord(new CustomCassandraAnnotatedPojo());
+   result.add(temp);
+   }
+
+   source.close();
+   Assert.assertEquals(20, result.size());
 
 Review comment:
   To save easily a list of CustomCassandraAnnotatedPojo, i used the 
CassandraPojoSink


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
bmeriaux commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219984964
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   OutputFormat> sink = new 
CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   sink.configure(new Configuration());
+   sink.open(0, 1);
+
+   for (Tuple3 value : collection) {
+   sink.writeRecord(value);
+   }
+
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, 
CustomCassandraAnnotatedPojo.class);
+   source.configure(new Configuration());
+   source.open(null);
+
+   List result = new ArrayList<>();
+
+   while (!source.reachedEnd()) {
+   CustomCassandraAnnotatedPojo temp = 
source.nextRecord(new CustomCassandraAnnotatedPojo());
+   result.add(temp);
+   }
+
+   source.close();
+   Assert.assertEquals(20, result.size());
 
 Review comment:
   To save easily a list of CustomCassandraAnnotatedPojo, i used the 
CassandraPojoSink


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626435#comment-16626435
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

bmeriaux commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219980389
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.Result;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ *
+ * @param  type of inputClass
+ */
+public class CassandraPojoInputFormat extends 
CassandraInputFormatBase {
+
+   private static final long serialVersionUID = 1992091320180905115L;
+   private final MapperOptions mapperOptions;
+
+   private transient Result resultSet;
+   private Class inputClass;
+
+   public CassandraPojoInputFormat(String query, ClusterBuilder builder, 
Class inputClass) {
+   this(query, builder, inputClass, null);
+   }
+
+   public CassandraPojoInputFormat(String query, ClusterBuilder builder, 
Class inputClass, MapperOptions mapperOptions) {
+   super(query, builder);
+   this.mapperOptions = mapperOptions;
+   Preconditions.checkArgument(inputClass != null, "InputClass 
cannot be null");
 
 Review comment:
   like previous one, i think we should stick on checkArgument to keep a 
IllegalArgumentException


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> 

[GitHub] bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
bmeriaux commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219980389
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.Result;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ *
+ * @param  type of inputClass
+ */
+public class CassandraPojoInputFormat extends 
CassandraInputFormatBase {
+
+   private static final long serialVersionUID = 1992091320180905115L;
+   private final MapperOptions mapperOptions;
+
+   private transient Result resultSet;
+   private Class inputClass;
+
+   public CassandraPojoInputFormat(String query, ClusterBuilder builder, 
Class inputClass) {
+   this(query, builder, inputClass, null);
+   }
+
+   public CassandraPojoInputFormat(String query, ClusterBuilder builder, 
Class inputClass, MapperOptions mapperOptions) {
+   super(query, builder);
+   this.mapperOptions = mapperOptions;
+   Preconditions.checkArgument(inputClass != null, "InputClass 
cannot be null");
 
 Review comment:
   like previous one, i think we should stick on checkArgument to keep a 
IllegalArgumentException


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626308#comment-16626308
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

bmeriaux commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219957391
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormatBase.class);
+
+   protected final String query;
+   protected final ClusterBuilder builder;
+
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   public CassandraInputFormatBase(String query, ClusterBuilder builder){
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+   Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
 
 Review comment:
   it will throw a NullPointerException instead of IllegallArgumentException, 
like others classes of the connector, i think we should keep checkArgument


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class 

[GitHub] bmeriaux commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
bmeriaux commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219957391
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormatBase.class);
+
+   protected final String query;
+   protected final ClusterBuilder builder;
+
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   public CassandraInputFormatBase(String query, ClusterBuilder builder){
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+   Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
 
 Review comment:
   it will throw a NullPointerException instead of IllegallArgumentException, 
like others classes of the connector, i think we should keep checkArgument


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8819) Rework travis script to use build stages

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626305#comment-16626305
 ] 

ASF GitHub Bot commented on FLINK-8819:
---

GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework 
travis script to use stages
URL: https://github.com/apache/flink/pull/6642#discussion_r219956271
 
 

 ##
 File path: tools/travis_controller.sh
 ##
 @@ -0,0 +1,211 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CACHE_DIR="$HOME/flink_cache"
+CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER"
+CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink"
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+source "${HERE}/travis/fold.sh"
+source "${HERE}/travis/stage.sh"
+source "${HERE}/travis/shade.sh"
+
+function deleteOldCaches() {
+   while read CACHE_DIR; do
+   local old_number="${CACHE_DIR##*/}"
+   if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then
+   echo "Deleting old cache $CACHE_DIR"
+   rm -rf "$CACHE_DIR"
+   fi
+   done
+}
+
+# delete leftover caches from previous builds
+find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | 
deleteOldCaches
+
+function getCurrentStage() {
+   STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2)
+   case $STAGE_NUMBER in
+   (1)
+   echo "$STAGE_COMPILE"
+   ;;
+   (2)
+   echo "$STAGE_COMPILE"
+   ;;
+   (3)
+   echo "$STAGE_CORE"
+   ;;
+   (4)
+   echo "$STAGE_LIBRARIES"
+   ;;
+   (5)
+   echo "$STAGE_CONNECTORS"
+   ;;
+   (6)
+   echo "$STAGE_TESTS"
+   ;;
+   (7)
+   echo "$STAGE_MISC"
+   ;;
+   (8)
+   echo "$STAGE_CORE"
+   ;;
+   (9)
+   echo "$STAGE_LIBRARIES"
+   ;;
+   (10)
+   echo "$STAGE_CONNECTORS"
+   ;;
+   (11)
+   echo "$STAGE_TESTS"
+   ;;
+   (12)
+   echo "$STAGE_MISC"
+   ;;
+   (13)
+   echo "$STAGE_CLEANUP"
+   ;;
+   (14)
+   echo "$STAGE_CLEANUP"
+   ;;
+   (*)
+   echo "Invalid stage detected ($STAGE_NUMBER)"
+   return 1
+   ;;
+   esac
+
+   return 0
+}
+
+STAGE=$(getCurrentStage)
+if [ $? != 0 ]; then
+   echo "Could not determine current stage."
+   exit 1
+fi
+echo "Current stage: \"$STAGE\""
+
+EXIT_CODE=0
+
+# Run actual compile steps
+if [ $STAGE == "$STAGE_COMPILE" ]; then
+   MVN="mvn clean install -nsu -Dflink.forkCount=2 
-Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests 
$PROFILE"
+   $MVN
+   EXIT_CODE=$?
+
+if [ $EXIT_CODE == 0 ]; then
+printf 
"\n\n==\n"
+printf "Checking dependency convergence\n"
+printf 
"==\n"
+
+./tools/check_dependency_convergence.sh
+EXIT_CODE=$?
+else
+printf 

[jira] [Commented] (FLINK-8819) Rework travis script to use build stages

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626303#comment-16626303
 ] 

ASF GitHub Bot commented on FLINK-8819:
---

GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework 
travis script to use stages
URL: https://github.com/apache/flink/pull/6642#discussion_r219956271
 
 

 ##
 File path: tools/travis_controller.sh
 ##
 @@ -0,0 +1,211 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CACHE_DIR="$HOME/flink_cache"
+CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER"
+CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink"
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+source "${HERE}/travis/fold.sh"
+source "${HERE}/travis/stage.sh"
+source "${HERE}/travis/shade.sh"
+
+function deleteOldCaches() {
+   while read CACHE_DIR; do
+   local old_number="${CACHE_DIR##*/}"
+   if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then
+   echo "Deleting old cache $CACHE_DIR"
+   rm -rf "$CACHE_DIR"
+   fi
+   done
+}
+
+# delete leftover caches from previous builds
+find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | 
deleteOldCaches
+
+function getCurrentStage() {
+   STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2)
+   case $STAGE_NUMBER in
+   (1)
+   echo "$STAGE_COMPILE"
+   ;;
+   (2)
+   echo "$STAGE_COMPILE"
+   ;;
+   (3)
+   echo "$STAGE_CORE"
+   ;;
+   (4)
+   echo "$STAGE_LIBRARIES"
+   ;;
+   (5)
+   echo "$STAGE_CONNECTORS"
+   ;;
+   (6)
+   echo "$STAGE_TESTS"
+   ;;
+   (7)
+   echo "$STAGE_MISC"
+   ;;
+   (8)
+   echo "$STAGE_CORE"
+   ;;
+   (9)
+   echo "$STAGE_LIBRARIES"
+   ;;
+   (10)
+   echo "$STAGE_CONNECTORS"
+   ;;
+   (11)
+   echo "$STAGE_TESTS"
+   ;;
+   (12)
+   echo "$STAGE_MISC"
+   ;;
+   (13)
+   echo "$STAGE_CLEANUP"
+   ;;
+   (14)
+   echo "$STAGE_CLEANUP"
+   ;;
+   (*)
+   echo "Invalid stage detected ($STAGE_NUMBER)"
+   return 1
+   ;;
+   esac
+
+   return 0
+}
+
+STAGE=$(getCurrentStage)
+if [ $? != 0 ]; then
+   echo "Could not determine current stage."
+   exit 1
+fi
+echo "Current stage: \"$STAGE\""
+
+EXIT_CODE=0
+
+# Run actual compile steps
+if [ $STAGE == "$STAGE_COMPILE" ]; then
+   MVN="mvn clean install -nsu -Dflink.forkCount=2 
-Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests 
$PROFILE"
+   $MVN
+   EXIT_CODE=$?
+
+if [ $EXIT_CODE == 0 ]; then
+printf 
"\n\n==\n"
+printf "Checking dependency convergence\n"
+printf 
"==\n"
+
+./tools/check_dependency_convergence.sh
+EXIT_CODE=$?
+else
+printf 

[jira] [Commented] (FLINK-10399) Refractor ParameterTool#fromArgs

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626304#comment-16626304
 ] 

ASF GitHub Bot commented on FLINK-10399:


StephanEwen commented on issue #6737: [FLINK-10399] Refractor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#issuecomment-424092282
 
 
   **Consensus that the contribution should go into to Flink**
 - +1 from my side
 - tests show the behavior is unchanged, so low risk
 - cleanup of bad code is desirable is low risk
   
   **Does not need specific attention**
 - No specific committer attention needed
   
   **Contribution description is fine**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refractor ParameterTool#fromArgs
> 
>
> Key: FLINK-10399
> URL: https://issues.apache.org/jira/browse/FLINK-10399
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{ParameterTool#fromArgs}} uses a weird implement which flink developer would 
> fail to parse it fast.
> The main problem is that, when parse args, we always try to get a key-value 
> pair, but the implement iterate by a {{for}} loop, thus introduce weird 
> flag/mutable variable and branches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages

2018-09-24 Thread GitBox
GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework 
travis script to use stages
URL: https://github.com/apache/flink/pull/6642#discussion_r219956271
 
 

 ##
 File path: tools/travis_controller.sh
 ##
 @@ -0,0 +1,211 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CACHE_DIR="$HOME/flink_cache"
+CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER"
+CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink"
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+source "${HERE}/travis/fold.sh"
+source "${HERE}/travis/stage.sh"
+source "${HERE}/travis/shade.sh"
+
+function deleteOldCaches() {
+   while read CACHE_DIR; do
+   local old_number="${CACHE_DIR##*/}"
+   if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then
+   echo "Deleting old cache $CACHE_DIR"
+   rm -rf "$CACHE_DIR"
+   fi
+   done
+}
+
+# delete leftover caches from previous builds
+find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | 
deleteOldCaches
+
+function getCurrentStage() {
+   STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2)
+   case $STAGE_NUMBER in
+   (1)
+   echo "$STAGE_COMPILE"
+   ;;
+   (2)
+   echo "$STAGE_COMPILE"
+   ;;
+   (3)
+   echo "$STAGE_CORE"
+   ;;
+   (4)
+   echo "$STAGE_LIBRARIES"
+   ;;
+   (5)
+   echo "$STAGE_CONNECTORS"
+   ;;
+   (6)
+   echo "$STAGE_TESTS"
+   ;;
+   (7)
+   echo "$STAGE_MISC"
+   ;;
+   (8)
+   echo "$STAGE_CORE"
+   ;;
+   (9)
+   echo "$STAGE_LIBRARIES"
+   ;;
+   (10)
+   echo "$STAGE_CONNECTORS"
+   ;;
+   (11)
+   echo "$STAGE_TESTS"
+   ;;
+   (12)
+   echo "$STAGE_MISC"
+   ;;
+   (13)
+   echo "$STAGE_CLEANUP"
+   ;;
+   (14)
+   echo "$STAGE_CLEANUP"
+   ;;
+   (*)
+   echo "Invalid stage detected ($STAGE_NUMBER)"
+   return 1
+   ;;
+   esac
+
+   return 0
+}
+
+STAGE=$(getCurrentStage)
+if [ $? != 0 ]; then
+   echo "Could not determine current stage."
+   exit 1
+fi
+echo "Current stage: \"$STAGE\""
+
+EXIT_CODE=0
+
+# Run actual compile steps
+if [ $STAGE == "$STAGE_COMPILE" ]; then
+   MVN="mvn clean install -nsu -Dflink.forkCount=2 
-Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests 
$PROFILE"
+   $MVN
+   EXIT_CODE=$?
+
+if [ $EXIT_CODE == 0 ]; then
+printf 
"\n\n==\n"
+printf "Checking dependency convergence\n"
+printf 
"==\n"
+
+./tools/check_dependency_convergence.sh
+EXIT_CODE=$?
+else
+printf 
"\n==\n"
+printf "Previous build failure detected, skipping 
dependency-convergence check.\n"
+printf 
"==\n"
+fi
+
+if [ 

[GitHub] StephanEwen commented on issue #6737: [FLINK-10399] Refractor ParameterTool#fromArgs

2018-09-24 Thread GitBox
StephanEwen commented on issue #6737: [FLINK-10399] Refractor 
ParameterTool#fromArgs
URL: https://github.com/apache/flink/pull/6737#issuecomment-424092282
 
 
   **Consensus that the contribution should go into to Flink**
 - +1 from my side
 - tests show the behavior is unchanged, so low risk
 - cleanup of bad code is desirable is low risk
   
   **Does not need specific attention**
 - No specific committer attention needed
   
   **Contribution description is fine**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages

2018-09-24 Thread GitBox
GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework 
travis script to use stages
URL: https://github.com/apache/flink/pull/6642#discussion_r219956271
 
 

 ##
 File path: tools/travis_controller.sh
 ##
 @@ -0,0 +1,211 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CACHE_DIR="$HOME/flink_cache"
+CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER"
+CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink"
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+source "${HERE}/travis/fold.sh"
+source "${HERE}/travis/stage.sh"
+source "${HERE}/travis/shade.sh"
+
+function deleteOldCaches() {
+   while read CACHE_DIR; do
+   local old_number="${CACHE_DIR##*/}"
+   if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then
+   echo "Deleting old cache $CACHE_DIR"
+   rm -rf "$CACHE_DIR"
+   fi
+   done
+}
+
+# delete leftover caches from previous builds
+find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | 
deleteOldCaches
+
+function getCurrentStage() {
+   STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2)
+   case $STAGE_NUMBER in
+   (1)
+   echo "$STAGE_COMPILE"
+   ;;
+   (2)
+   echo "$STAGE_COMPILE"
+   ;;
+   (3)
+   echo "$STAGE_CORE"
+   ;;
+   (4)
+   echo "$STAGE_LIBRARIES"
+   ;;
+   (5)
+   echo "$STAGE_CONNECTORS"
+   ;;
+   (6)
+   echo "$STAGE_TESTS"
+   ;;
+   (7)
+   echo "$STAGE_MISC"
+   ;;
+   (8)
+   echo "$STAGE_CORE"
+   ;;
+   (9)
+   echo "$STAGE_LIBRARIES"
+   ;;
+   (10)
+   echo "$STAGE_CONNECTORS"
+   ;;
+   (11)
+   echo "$STAGE_TESTS"
+   ;;
+   (12)
+   echo "$STAGE_MISC"
+   ;;
+   (13)
+   echo "$STAGE_CLEANUP"
+   ;;
+   (14)
+   echo "$STAGE_CLEANUP"
+   ;;
+   (*)
+   echo "Invalid stage detected ($STAGE_NUMBER)"
+   return 1
+   ;;
+   esac
+
+   return 0
+}
+
+STAGE=$(getCurrentStage)
+if [ $? != 0 ]; then
+   echo "Could not determine current stage."
+   exit 1
+fi
+echo "Current stage: \"$STAGE\""
+
+EXIT_CODE=0
+
+# Run actual compile steps
+if [ $STAGE == "$STAGE_COMPILE" ]; then
+   MVN="mvn clean install -nsu -Dflink.forkCount=2 
-Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B -DskipTests 
$PROFILE"
+   $MVN
+   EXIT_CODE=$?
+
+if [ $EXIT_CODE == 0 ]; then
+printf 
"\n\n==\n"
+printf "Checking dependency convergence\n"
+printf 
"==\n"
+
+./tools/check_dependency_convergence.sh
+EXIT_CODE=$?
+else
+printf 
"\n==\n"
+printf "Previous build failure detected, skipping 
dependency-convergence check.\n"
+printf 
"==\n"
+fi
+
+if [ 

[jira] [Commented] (FLINK-8819) Rework travis script to use build stages

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626299#comment-16626299
 ] 

ASF GitHub Bot commented on FLINK-8819:
---

GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework 
travis script to use stages
URL: https://github.com/apache/flink/pull/6642#discussion_r219955526
 
 

 ##
 File path: tools/travis_controller.sh
 ##
 @@ -0,0 +1,211 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CACHE_DIR="$HOME/flink_cache"
+CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER"
+CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink"
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+source "${HERE}/travis/fold.sh"
+source "${HERE}/travis/stage.sh"
+source "${HERE}/travis/shade.sh"
+
+function deleteOldCaches() {
+   while read CACHE_DIR; do
+   local old_number="${CACHE_DIR##*/}"
+   if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then
+   echo "Deleting old cache $CACHE_DIR"
+   rm -rf "$CACHE_DIR"
+   fi
+   done
+}
+
+# delete leftover caches from previous builds
+find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | 
deleteOldCaches
+
+function getCurrentStage() {
+   STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2)
+   case $STAGE_NUMBER in
+   (1)
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Rework travis script to use build stages
> 
>
> Key: FLINK-8819
> URL: https://issues.apache.org/jira/browse/FLINK-8819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System, Travis
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
>  Labels: pull-request-available
>
> This issue is for tracking efforts to rework our Travis scripts to use 
> [stages|https://docs.travis-ci.com/user/build-stages/].
> This feature allows us to define a sequence of jobs that are run one after 
> another. This implies that we can define dependencies between jobs, in 
> contrast to our existing jobs that have to be self-contained.
> As an example, we could have a compile stage, and a test stage with multiple 
> jobs.
> The main benefit here is that we no longer have to compile modules multiple 
> times, which would reduce our build times.
> The major issue here however is that there is no _proper_ support for passing 
> build-artifacts from one stage to the next. According to this 
> [issue|https://github.com/travis-ci/beta-features/issues/28] it is on their 
> to-do-list however.
> In the mean-time we could manually transfer the artifacts between stages by 
> either using the Travis cache or some other external storage. The cache 
> solution would work by setting up a cached directory (just like the mvn 
> cache) and creating build-scope directories within containing the artifacts 
> (I have a prototype that works like this).
> The major concern here is that of cleaning up the cache/storage.
>  We can clean things up if
>  * our script fails
>  * the last stage succeeds.
> We can *not* clean things up if
>  * the build is canceled
>  * travis fails the build due to a timeout or similar
> as apparently there is [no way to run a script at the end of a 
> 

[jira] [Commented] (FLINK-8819) Rework travis script to use build stages

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626300#comment-16626300
 ] 

ASF GitHub Bot commented on FLINK-8819:
---

GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework 
travis script to use stages
URL: https://github.com/apache/flink/pull/6642#discussion_r219955579
 
 

 ##
 File path: tools/travis_mvn_watchdog.sh
 ##
 @@ -160,10 +55,11 @@ esac
 # -nsu option forbids downloading snapshot artifacts. The only snapshot 
artifacts we depend are from
 # Flink, which however should all be built locally. see FLINK-7230
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -B 
$MVN_LOGGING_OPTIONS"
-MVN_COMPILE_OPTIONS="$MVN_COMPILE_OPTIONS -DskipTests"
+MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast -B $MVN_LOGGING_OPTIONS"
+MVN_COMPILE_OPTIONS="-DskipTests"
+MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS"
 
-MVN_COMPILE="mvn $MVN_COMMON_OPTIONS $MVN_COMPILE_OPTIONS $PROFILE 
$MVN_COMPILE_MODULES clean install"
+MVN_COMPILE="mvn $MVN_COMMON_OPTIONS $MVN_COMPILE_OPTIONS $PROFILE 
$MVN_COMPILE_MODULES install"
 MVN_TEST="mvn $MVN_COMMON_OPTIONS $MVN_TEST_OPTIONS $PROFILE $MVN_TEST_MODULES 
verify"
 
 MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid"
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Rework travis script to use build stages
> 
>
> Key: FLINK-8819
> URL: https://issues.apache.org/jira/browse/FLINK-8819
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System, Travis
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
>  Labels: pull-request-available
>
> This issue is for tracking efforts to rework our Travis scripts to use 
> [stages|https://docs.travis-ci.com/user/build-stages/].
> This feature allows us to define a sequence of jobs that are run one after 
> another. This implies that we can define dependencies between jobs, in 
> contrast to our existing jobs that have to be self-contained.
> As an example, we could have a compile stage, and a test stage with multiple 
> jobs.
> The main benefit here is that we no longer have to compile modules multiple 
> times, which would reduce our build times.
> The major issue here however is that there is no _proper_ support for passing 
> build-artifacts from one stage to the next. According to this 
> [issue|https://github.com/travis-ci/beta-features/issues/28] it is on their 
> to-do-list however.
> In the mean-time we could manually transfer the artifacts between stages by 
> either using the Travis cache or some other external storage. The cache 
> solution would work by setting up a cached directory (just like the mvn 
> cache) and creating build-scope directories within containing the artifacts 
> (I have a prototype that works like this).
> The major concern here is that of cleaning up the cache/storage.
>  We can clean things up if
>  * our script fails
>  * the last stage succeeds.
> We can *not* clean things up if
>  * the build is canceled
>  * travis fails the build due to a timeout or similar
> as apparently there is [no way to run a script at the end of a 
> build|https://github.com/travis-ci/travis-ci/issues/4221].
> Thus we would either have to periodically clear the cache, or encode more 
> information into the cached files that would allow _other_ builds to clean up 
> stale date. (For example the build number or date).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages

2018-09-24 Thread GitBox
GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework 
travis script to use stages
URL: https://github.com/apache/flink/pull/6642#discussion_r219955579
 
 

 ##
 File path: tools/travis_mvn_watchdog.sh
 ##
 @@ -160,10 +55,11 @@ esac
 # -nsu option forbids downloading snapshot artifacts. The only snapshot 
artifacts we depend are from
 # Flink, which however should all be built locally. see FLINK-7230
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -B 
$MVN_LOGGING_OPTIONS"
-MVN_COMPILE_OPTIONS="$MVN_COMPILE_OPTIONS -DskipTests"
+MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast -B $MVN_LOGGING_OPTIONS"
+MVN_COMPILE_OPTIONS="-DskipTests"
+MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS"
 
-MVN_COMPILE="mvn $MVN_COMMON_OPTIONS $MVN_COMPILE_OPTIONS $PROFILE 
$MVN_COMPILE_MODULES clean install"
+MVN_COMPILE="mvn $MVN_COMMON_OPTIONS $MVN_COMPILE_OPTIONS $PROFILE 
$MVN_COMPILE_MODULES install"
 MVN_TEST="mvn $MVN_COMMON_OPTIONS $MVN_TEST_OPTIONS $PROFILE $MVN_TEST_MODULES 
verify"
 
 MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid"
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework travis script to use stages

2018-09-24 Thread GitBox
GJL commented on a change in pull request #6642: [FLINK-8819][travis] Rework 
travis script to use stages
URL: https://github.com/apache/flink/pull/6642#discussion_r219955526
 
 

 ##
 File path: tools/travis_controller.sh
 ##
 @@ -0,0 +1,211 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CACHE_DIR="$HOME/flink_cache"
+CACHE_BUILD_DIR="$CACHE_DIR/$TRAVIS_BUILD_NUMBER"
+CACHE_FLINK_DIR="$CACHE_BUILD_DIR/flink"
+
+HERE="`dirname \"$0\"`"# relative
+HERE="`( cd \"$HERE\" && pwd )`"   # absolutized and normalized
+if [ -z "$HERE" ] ; then
+   # error; for some reason, the path is not accessible
+   # to the script (e.g. permissions re-evaled after suid)
+   exit 1  # fail
+fi
+
+source "${HERE}/travis/fold.sh"
+source "${HERE}/travis/stage.sh"
+source "${HERE}/travis/shade.sh"
+
+function deleteOldCaches() {
+   while read CACHE_DIR; do
+   local old_number="${CACHE_DIR##*/}"
+   if [ "$old_number" -lt "$TRAVIS_BUILD_NUMBER" ]; then
+   echo "Deleting old cache $CACHE_DIR"
+   rm -rf "$CACHE_DIR"
+   fi
+   done
+}
+
+# delete leftover caches from previous builds
+find "$CACHE_DIR" -mindepth 1 -maxdepth 1 | grep -v "$TRAVIS_BUILD_NUMBER" | 
deleteOldCaches
+
+function getCurrentStage() {
+   STAGE_NUMBER=$(echo "$TRAVIS_JOB_NUMBER" | cut -d'.' -f 2)
+   case $STAGE_NUMBER in
+   (1)
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10126) There should be a Scala DataSource

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626292#comment-16626292
 ] 

ASF GitHub Bot commented on FLINK-10126:


StephanEwen commented on issue #6738: [FLINK-10126] There should be a Scala 
DataSource
URL: https://github.com/apache/flink/pull/6738#issuecomment-424090397
 
 
   This fix seems to be a workaround for the real issue that the Scala API 
misses a DataSource type.
   I would suggest to see if that can be introduced without breaking the API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> There should be a Scala DataSource
> --
>
> Key: FLINK-10126
> URL: https://issues.apache.org/jira/browse/FLINK-10126
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexis Sarda-Espinosa
>Assignee: vinoyang
>Priority: Minor
>  Labels: datasource, pull-request-available, scala
>
> In Java, an ExecutionEnvironment's createInput method returns a DataSource, 
> whereas the Scala version returns a DataSet. There is no Scala DataSource 
> wrapper, and the Scala DataSet does not provide the Java DataSource methods, 
> such as getSplitDataProperties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6738: [FLINK-10126] There should be a Scala DataSource

2018-09-24 Thread GitBox
StephanEwen commented on issue #6738: [FLINK-10126] There should be a Scala 
DataSource
URL: https://github.com/apache/flink/pull/6738#issuecomment-424090397
 
 
   This fix seems to be a workaround for the real issue that the Scala API 
misses a DataSource type.
   I would suggest to see if that can be introduced without breaking the API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626240#comment-16626240
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219941080
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
 
 Review comment:
   does `-/CassandraOutputFormat` mean something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626239#comment-16626239
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219940842
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   OutputFormat> sink = new 
CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   sink.configure(new Configuration());
+   sink.open(0, 1);
+
+   for (Tuple3 value : collection) {
+   sink.writeRecord(value);
+   }
+
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, 
CustomCassandraAnnotatedPojo.class);
+   source.configure(new Configuration());
+   source.open(null);
+
+   List result = new ArrayList<>();
+
+   while (!source.reachedEnd()) {
+   CustomCassandraAnnotatedPojo temp = 
source.nextRecord(new CustomCassandraAnnotatedPojo());
+   result.add(temp);
+   }
+
+   source.close();
+   Assert.assertEquals(20, result.size());
 
 Review comment:
   Can we also convert `collection` of tuples into list of 
`CustomCassandraAnnotatedPojo` and check the contents as well, additionally to 
size?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626244#comment-16626244
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219932299
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
 
 Review comment:
   Comment:
   Base class for {@link RichInputFormat} to read data from Apache Cassandra.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by 

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626237#comment-16626237
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219943237
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
+ */
+public class BatchPojoExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   ArrayList> collection = new 
ArrayList<>(20);
+   for (int i = 0; i < 20; i++) {
+   collection.add(new Tuple2<>(i, "string " + i));
+   }
+
+   DataSet> dataSet = 
env.fromCollection(collection);
+
+   dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() {
 
 Review comment:
   we can remove `Tuple2` and use empty `<>` (two places)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> 

[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219941080
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
 
 Review comment:
   does `-/CassandraOutputFormat` mean something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626236#comment-16626236
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219935777
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+/**
+ * Example of Cassandra Annotated POJO class for use with 
CassandraInputFormatter.
 
 Review comment:
   In doc comment:
   `CassandraInputFormatter` -> `{@link CassandraPojoInputFormat}`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626241#comment-16626241
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219942318
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
 
 Review comment:
   I think there is still some mismatch between queries, doc comments, tuples 
in `dataSet ` and fields of `CustomCassandraAnnotatedPojo`. 
`testCassandraBatchPojoFormat` looks better from this side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I 

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626245#comment-16626245
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219934055
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.Result;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ *
+ * @param  type of inputClass
+ */
+public class CassandraPojoInputFormat extends 
CassandraInputFormatBase {
+
+   private static final long serialVersionUID = 1992091320180905115L;
+   private final MapperOptions mapperOptions;
+
+   private transient Result resultSet;
+   private Class inputClass;
 
 Review comment:
   can be `final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626232#comment-16626232
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219929097
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
 ##
 @@ -44,38 +31,18 @@
  *
  * @param  type of Tuple
  */
-public class CassandraInputFormat extends 
RichInputFormat implements NonParallelInput {
-   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormat.class);
+public class CassandraInputFormat extends 
CassandraInputFormatBase {
 
 Review comment:
   please add `serialVersionUID` to `CassandraInputFormat` and 
`CassandraInputFormatBase`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626234#comment-16626234
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219929908
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormatBase.class);
+
+   protected final String query;
+   protected final ClusterBuilder builder;
+
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   public CassandraInputFormatBase(String query, ClusterBuilder builder){
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+   Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
 
 Review comment:
   I think we can use `checkNotNull`:
   ```
   Preconditions.checkNotNull(builder, "Builder cannot be null");
   ```
   for readability


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called 

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626238#comment-16626238
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219930899
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormatBase.class);
+
+   protected final String query;
+   protected final ClusterBuilder builder;
+
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   public CassandraInputFormatBase(String query, ClusterBuilder builder){
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+   Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
+
+   this.query = query;
+   this.builder = builder;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   this.cluster = builder.getCluster();
+   }
+
+   @Override
+   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
 
 Review comment:
   We can remove exception declarations from all methods (also in other touched 
classes) where they are not actually thrown.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. 

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626235#comment-16626235
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219942450
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
+ */
+public class BatchPojoExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   ArrayList> collection = new 
ArrayList<>(20);
+   for (int i = 0; i < 20; i++) {
+   collection.add(new Tuple2<>(i, "string " + i));
+   }
+
+   DataSet> dataSet = 
env.fromCollection(collection);
+
+   dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return 
builder.addContactPoints("127.0.0.1").build();
+   }
+   }));
+
 
 Review comment:
   I think `env.execute("Write");` should be here, not at the end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
> 

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626233#comment-16626233
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219934197
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.Result;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ *
+ * @param  type of inputClass
+ */
+public class CassandraPojoInputFormat extends 
CassandraInputFormatBase {
+
+   private static final long serialVersionUID = 1992091320180905115L;
+   private final MapperOptions mapperOptions;
+
+   private transient Result resultSet;
+   private Class inputClass;
+
+   public CassandraPojoInputFormat(String query, ClusterBuilder builder, 
Class inputClass) {
+   this(query, builder, inputClass, null);
+   }
+
+   public CassandraPojoInputFormat(String query, ClusterBuilder builder, 
Class inputClass, MapperOptions mapperOptions) {
+   super(query, builder);
+   this.mapperOptions = mapperOptions;
+   Preconditions.checkArgument(inputClass != null, "InputClass 
cannot be null");
 
 Review comment:
   `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626246#comment-16626246
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219943693
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
+ */
+public class BatchPojoExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   ArrayList> collection = new 
ArrayList<>(20);
+   for (int i = 0; i < 20; i++) {
+   collection.add(new Tuple2<>(i, "string " + i));
+   }
+
+   DataSet> dataSet = 
env.fromCollection(collection);
+
+   dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return 
builder.addContactPoints("127.0.0.1").build();
+   }
+   }));
+
+   /*
+*  This is for the purpose of showing an example of 
creating a DataSet using CassandraPojoInputFormat.
+*/
+   DataSet inputDS = env
+   .createInput(new 
CassandraPojoInputFormat<>(SELECT_QUERY, new ClusterBuilder() {
 
 Review comment:
   `ClusterBuilder` should also have `serialVersionUID`. It also duplicated in 
2 places. We can create a variable to define it once.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New 

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626242#comment-16626242
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219931705
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormatBase.class);
+
+   protected final String query;
+   protected final ClusterBuilder builder;
+
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   public CassandraInputFormatBase(String query, ClusterBuilder builder){
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+   Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
+
+   this.query = query;
+   this.builder = builder;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   this.cluster = builder.getCluster();
+   }
+
+   @Override
+   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
+   return cachedStatistics;
+   }
+
+   @Override
+   public InputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
+   GenericInputSplit[] split = {new GenericInputSplit(0, 1)};
+   return split;
 
 Review comment:
   `split` local variable is redundant, we can just return `{new 
GenericInputSplit(0, 1)}`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API 

[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626243#comment-16626243
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219930598
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
 
 Review comment:
   I think the whole class, constructor and fields can be package private


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> 

[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219942450
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
+ */
+public class BatchPojoExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   ArrayList> collection = new 
ArrayList<>(20);
+   for (int i = 0; i < 20; i++) {
+   collection.add(new Tuple2<>(i, "string " + i));
+   }
+
+   DataSet> dataSet = 
env.fromCollection(collection);
+
+   dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return 
builder.addContactPoints("127.0.0.1").build();
+   }
+   }));
+
 
 Review comment:
   I think `env.execute("Write");` should be here, not at the end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219931705
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormatBase.class);
+
+   protected final String query;
+   protected final ClusterBuilder builder;
+
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   public CassandraInputFormatBase(String query, ClusterBuilder builder){
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+   Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
+
+   this.query = query;
+   this.builder = builder;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   this.cluster = builder.getCluster();
+   }
+
+   @Override
+   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
+   return cachedStatistics;
+   }
+
+   @Override
+   public InputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
+   GenericInputSplit[] split = {new GenericInputSplit(0, 1)};
+   return split;
 
 Review comment:
   `split` local variable is redundant, we can just return `{new 
GenericInputSplit(0, 1)}`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219930598
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
 
 Review comment:
   I think the whole class, constructor and fields can be package private


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219943693
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
+ */
+public class BatchPojoExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   ArrayList> collection = new 
ArrayList<>(20);
+   for (int i = 0; i < 20; i++) {
+   collection.add(new Tuple2<>(i, "string " + i));
+   }
+
+   DataSet> dataSet = 
env.fromCollection(collection);
+
+   dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return 
builder.addContactPoints("127.0.0.1").build();
+   }
+   }));
+
+   /*
+*  This is for the purpose of showing an example of 
creating a DataSet using CassandraPojoInputFormat.
+*/
+   DataSet inputDS = env
+   .createInput(new 
CassandraPojoInputFormat<>(SELECT_QUERY, new ClusterBuilder() {
 
 Review comment:
   `ClusterBuilder` should also have `serialVersionUID`. It also duplicated in 
2 places. We can create a variable to define it once.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219935777
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+/**
+ * Example of Cassandra Annotated POJO class for use with 
CassandraInputFormatter.
 
 Review comment:
   In doc comment:
   `CassandraInputFormatter` -> `{@link CassandraPojoInputFormat}`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219943237
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
+ */
+public class BatchPojoExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   ArrayList> collection = new 
ArrayList<>(20);
+   for (int i = 0; i < 20; i++) {
+   collection.add(new Tuple2<>(i, "string " + i));
+   }
+
+   DataSet> dataSet = 
env.fromCollection(collection);
+
+   dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() {
 
 Review comment:
   we can remove `Tuple2` and use empty `<>` (two places)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219942318
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
 
 Review comment:
   I think there is still some mismatch between queries, doc comments, tuples 
in `dataSet ` and fields of `CustomCassandraAnnotatedPojo`. 
`testCassandraBatchPojoFormat` looks better from this side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219930899
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormatBase.class);
+
+   protected final String query;
+   protected final ClusterBuilder builder;
+
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   public CassandraInputFormatBase(String query, ClusterBuilder builder){
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+   Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
+
+   this.query = query;
+   this.builder = builder;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   this.cluster = builder.getCluster();
+   }
+
+   @Override
+   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
 
 Review comment:
   We can remove exception declarations from all methods (also in other touched 
classes) where they are not actually thrown.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219934197
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.Result;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ *
+ * @param  type of inputClass
+ */
+public class CassandraPojoInputFormat extends 
CassandraInputFormatBase {
+
+   private static final long serialVersionUID = 1992091320180905115L;
+   private final MapperOptions mapperOptions;
+
+   private transient Result resultSet;
+   private Class inputClass;
+
+   public CassandraPojoInputFormat(String query, ClusterBuilder builder, 
Class inputClass) {
+   this(query, builder, inputClass, null);
+   }
+
+   public CassandraPojoInputFormat(String query, ClusterBuilder builder, 
Class inputClass, MapperOptions mapperOptions) {
+   super(query, builder);
+   this.mapperOptions = mapperOptions;
+   Preconditions.checkArgument(inputClass != null, "InputClass 
cannot be null");
 
 Review comment:
   `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219934055
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.Result;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ *
+ * @param  type of inputClass
+ */
+public class CassandraPojoInputFormat extends 
CassandraInputFormatBase {
+
+   private static final long serialVersionUID = 1992091320180905115L;
+   private final MapperOptions mapperOptions;
+
+   private transient Result resultSet;
+   private Class inputClass;
 
 Review comment:
   can be `final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219940842
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   OutputFormat> sink = new 
CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   sink.configure(new Configuration());
+   sink.open(0, 1);
+
+   for (Tuple3 value : collection) {
+   sink.writeRecord(value);
+   }
+
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, 
CustomCassandraAnnotatedPojo.class);
+   source.configure(new Configuration());
+   source.open(null);
+
+   List result = new ArrayList<>();
+
+   while (!source.reachedEnd()) {
+   CustomCassandraAnnotatedPojo temp = 
source.nextRecord(new CustomCassandraAnnotatedPojo());
+   result.add(temp);
+   }
+
+   source.close();
+   Assert.assertEquals(20, result.size());
 
 Review comment:
   Can we also convert `collection` of tuples into list of 
`CustomCassandraAnnotatedPojo` and check the contents as well, additionally to 
size?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219929908
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
+ * @param  type of inputClass
+ */
+public abstract class CassandraInputFormatBase extends 
RichInputFormat implements NonParallelInput {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormatBase.class);
+
+   protected final String query;
+   protected final ClusterBuilder builder;
+
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   public CassandraInputFormatBase(String query, ClusterBuilder builder){
+   Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+   Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
 
 Review comment:
   I think we can use `checkNotNull`:
   ```
   Preconditions.checkNotNull(builder, "Builder cannot be null");
   ```
   for readability


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219932299
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * InputFormat to read data from Apache Cassandra and generate a custom 
Cassandra annotated object.
 
 Review comment:
   Comment:
   Base class for {@link RichInputFormat} to read data from Apache Cassandra.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-24 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r219929097
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
 ##
 @@ -44,38 +31,18 @@
  *
  * @param  type of Tuple
  */
-public class CassandraInputFormat extends 
RichInputFormat implements NonParallelInput {
-   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormat.class);
+public class CassandraInputFormat extends 
CassandraInputFormatBase {
 
 Review comment:
   please add `serialVersionUID` to `CassandraInputFormat` and 
`CassandraInputFormatBase`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-24 Thread Ufuk Celebi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626190#comment-16626190
 ] 

Ufuk Celebi commented on FLINK-10292:
-

I understand that non-determinism may be an issue when generating the 
{{JobGraph}}, but do we have some data about how common that is for 
applications? Would it be possible to keep a fixed JobGraph in the image 
instead of persisting one in the {{SubmittedJobGraphStore}}?

I like our current approach, because it keeps the source of truth for the job 
in the image instead of the {{SubmittedJobGraphStore}}. I'm wondering about the 
following scenario:
 * A user creates a job cluster with high availability enabled (cluster ID for 
the logical application, e.g. myapp)
 ** This will persist the job with a fixed ID (after FLINK-10291) on first 
submission
 * The user kills the application *without* cancelling
 ** This will leave all data in the high availability store(s) such as job 
graphs or checkpoints
 * The user updates the image with a modified application and keeps the high 
availability configuration (e.g. cluster ID stays myapp)
 ** This will result in the job in the image to be ignored since we already 
have a job graph with the same (fixed) ID

I think in such a scenario it can be desirable to still have the checkpoints 
available, but it might be problematic if the job graph is recovered from the 
{{SubmittedJobGraphStore}} instead of using the job that is part of the image. 
What do you think about this scenario? Is it the responsibility of the user to 
handle this? If so, I think that the approach outlined in this ticket makes 
sense. If not, we may want to consider alternatives or ignore potential 
non-determinism.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10209) Exclude jdk.tools dependency from hadoop when running with java 9

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626142#comment-16626142
 ] 

ASF GitHub Bot commented on FLINK-10209:


StephanEwen commented on issue #6663:  [FLINK-10209][build] Exclude jdk.tools 
dependency from hadoop 
URL: https://github.com/apache/flink/pull/6663#issuecomment-424054104
 
 
   This context may make my line of thinking easier to understand:
   
   The hadoop-shaded module is a convenience artifact, one where we discussed 
previously to phase it out eventually. It is used (1) to compile against (for 
HDFS / YARN / Kerberos code) and (2) to add as a jar to the lib folder.
   
 - Concerning a general exclusion tor the jdk.tools dependency: Since we 
don't compile Hadoop itself and we don't redistribute that dependency (it is a 
system dependency) I cannot see how a general exclusion would be a problem. It 
simplifies the build files, which is something really good.
   
 - We should encourage use of HADOOP_CLASSPATH rather than use of our 
Hadoop fat jar anyways. That reduces the value of the second use of the 
hadoop-shaded  project, the packaging into the dist lib folder. If we purely go 
for the HADOOP_CLASSPATH variant, we could remove that project all together and 
simply have a provided or optional Hadoop dependency.
   
 - The fat hadoop jar is used for client side functionality only, and since 
version 2, Hadoop claims to have a stable setup (HDFS protocol, Kerberos 
config, etc.) , so we don't need each major/minor version, but one of every 
major version should work. We should not need the vendor specific versions 
either. And, there is still the HADOOP_CLASSPATH workaround in case any of the 
vendor-specific versions has a compatibility problem after all.
   
 - Concerning moving Hadoop to flink-shaded: We don't have to find a setup 
that converges across Hadoop versions, that is exactly the point. We pick some 
Hadoop versions for which we want to build convenience jars and converge these 
manually or by shading.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Exclude jdk.tools dependency from hadoop when running with java 9
> -
>
> Key: FLINK-10209
> URL: https://issues.apache.org/jira/browse/FLINK-10209
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{hadoop-common}} has a {{jdk.tools}} dependency which cannot be resolved on 
> java 9. At least for compiling we have to exclude this dependency.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6663: [FLINK-10209][build] Exclude jdk.tools dependency from hadoop

2018-09-24 Thread GitBox
StephanEwen commented on issue #6663:  [FLINK-10209][build] Exclude jdk.tools 
dependency from hadoop 
URL: https://github.com/apache/flink/pull/6663#issuecomment-424054104
 
 
   This context may make my line of thinking easier to understand:
   
   The hadoop-shaded module is a convenience artifact, one where we discussed 
previously to phase it out eventually. It is used (1) to compile against (for 
HDFS / YARN / Kerberos code) and (2) to add as a jar to the lib folder.
   
 - Concerning a general exclusion tor the jdk.tools dependency: Since we 
don't compile Hadoop itself and we don't redistribute that dependency (it is a 
system dependency) I cannot see how a general exclusion would be a problem. It 
simplifies the build files, which is something really good.
   
 - We should encourage use of HADOOP_CLASSPATH rather than use of our 
Hadoop fat jar anyways. That reduces the value of the second use of the 
hadoop-shaded  project, the packaging into the dist lib folder. If we purely go 
for the HADOOP_CLASSPATH variant, we could remove that project all together and 
simply have a provided or optional Hadoop dependency.
   
 - The fat hadoop jar is used for client side functionality only, and since 
version 2, Hadoop claims to have a stable setup (HDFS protocol, Kerberos 
config, etc.) , so we don't need each major/minor version, but one of every 
major version should work. We should not need the vendor specific versions 
either. And, there is still the HADOOP_CLASSPATH workaround in case any of the 
vendor-specific versions has a compatibility problem after all.
   
 - Concerning moving Hadoop to flink-shaded: We don't have to find a setup 
that converges across Hadoop versions, that is exactly the point. We pick some 
Hadoop versions for which we want to build convenience jars and converge these 
manually or by shading.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626107#comment-16626107
 ] 

ASF GitHub Bot commented on FLINK-10295:


azagrebin commented on issue #6754: [FLINK-10295] Add support of passing jar 
arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#issuecomment-424045988
 
 
   cc @zentol


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on issue #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-09-24 Thread GitBox
azagrebin commented on issue #6754: [FLINK-10295] Add support of passing jar 
arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#issuecomment-424045988
 
 
   cc @zentol


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626106#comment-16626106
 ] 

ASF GitHub Bot commented on FLINK-10295:


azagrebin opened a new pull request #6754: [FLINK-10295] Add support of passing 
jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754
 
 
   ## What is the purpose of the change
   
   This PR add one more option to specify jar program arguments in REST API 
calls (run and get plan). Users can specify arguments as one string or 
additionally mow as a list of a separate strings.
   
   ## Brief change log
   
 - add JarRequestBody as a base for JarRunRequestBody and JarPlanRequestBody
 - add programArgsList to JarRequestBody
 - add JarHandlerUtils.getProgramArgs to resolve program args from 
programArgs and programArgsList from query path or json body in JarRunHandler 
or JarPlanHandler
 - user can use programArgs or programArgsList but not both, add test for it
   
   ## Verifying this change
   
   Unit tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-09-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10295:
---
Labels: pull-request-available  (was: )

> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin opened a new pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-09-24 Thread GitBox
azagrebin opened a new pull request #6754: [FLINK-10295] Add support of passing 
jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754
 
 
   ## What is the purpose of the change
   
   This PR add one more option to specify jar program arguments in REST API 
calls (run and get plan). Users can specify arguments as one string or 
additionally mow as a list of a separate strings.
   
   ## Brief change log
   
 - add JarRequestBody as a base for JarRunRequestBody and JarPlanRequestBody
 - add programArgsList to JarRequestBody
 - add JarHandlerUtils.getProgramArgs to resolve program args from 
programArgs and programArgsList from query path or json body in JarRunHandler 
or JarPlanHandler
 - user can use programArgs or programArgsList but not both, add test for it
   
   ## Verifying this change
   
   Unit tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10251) Handle oversized response messages in AkkaRpcActor

2018-09-24 Thread tison (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tison reassigned FLINK-10251:
-

Assignee: (was: tison)

> Handle oversized response messages in AkkaRpcActor
> --
>
> Key: FLINK-10251
> URL: https://issues.apache.org/jira/browse/FLINK-10251
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The {{AkkaRpcActor}} should check whether an RPC response which is sent to a 
> remote sender does not exceed the maximum framesize of the underlying 
> {{ActorSystem}}. If this is the case we should fail fast instead. We can 
> achieve this by serializing the response and sending the serialized byte 
> array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-1960) Add comments and docs for withForwardedFields and related operators

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-1960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626069#comment-16626069
 ] 

ASF GitHub Bot commented on FLINK-1960:
---

dmpalyvos opened a new pull request #6753: [FLINK-1960] [Documentation] Add 
docs for withForwardedFields and related operators in Scala API
URL: https://github.com/apache/flink/pull/6753
 
 
   ## What is the purpose of the change
   
   The withForwardedFields and related operators have no docs in the Scala API. 
This pull request adds  documentation (javadoc) for them.
   
   ## Brief change log
   
   *Added documentation for the `withForwardedFields*` Scala functions based on 
the javadoc of the related java functions*:
 - *Added documentation to withForwardedFields*
 - *Added documentation to withForwardedFieldsFirst*
 - *Added documentation to withForwardedFieldsSecond*
   
   ## Verifying this change
   
   This change is a trivial documentation update without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? -
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add comments and docs for withForwardedFields and related operators
> ---
>
> Key: FLINK-1960
> URL: https://issues.apache.org/jira/browse/FLINK-1960
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Theodore Vasiloudis
>Assignee: hzhuangzhenxi
>Priority: Minor
>  Labels: documentation, pull-request-available, starter
>
> The withForwardedFields and related operators have no docs for the Scala API. 
> It would be useful to have code comments and example usage in the docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-1960) Add comments and docs for withForwardedFields and related operators

2018-09-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-1960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-1960:
--
Labels: documentation pull-request-available starter  (was: documentation 
starter)

> Add comments and docs for withForwardedFields and related operators
> ---
>
> Key: FLINK-1960
> URL: https://issues.apache.org/jira/browse/FLINK-1960
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Theodore Vasiloudis
>Assignee: hzhuangzhenxi
>Priority: Minor
>  Labels: documentation, pull-request-available, starter
>
> The withForwardedFields and related operators have no docs for the Scala API. 
> It would be useful to have code comments and example usage in the docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dmpalyvos opened a new pull request #6753: [FLINK-1960] [Documentation] Add docs for withForwardedFields and related operators in Scala API

2018-09-24 Thread GitBox
dmpalyvos opened a new pull request #6753: [FLINK-1960] [Documentation] Add 
docs for withForwardedFields and related operators in Scala API
URL: https://github.com/apache/flink/pull/6753
 
 
   ## What is the purpose of the change
   
   The withForwardedFields and related operators have no docs in the Scala API. 
This pull request adds  documentation (javadoc) for them.
   
   ## Brief change log
   
   *Added documentation for the `withForwardedFields*` Scala functions based on 
the javadoc of the related java functions*:
 - *Added documentation to withForwardedFields*
 - *Added documentation to withForwardedFieldsFirst*
 - *Added documentation to withForwardedFieldsSecond*
   
   ## Verifying this change
   
   This change is a trivial documentation update without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? -
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10396) Remove codebase switch from MiniClusterResource

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626038#comment-16626038
 ] 

ASF GitHub Bot commented on FLINK-10396:


zentol commented on a change in pull request #6748: [FLINK-10396] Remove 
CodebaseType
URL: https://github.com/apache/flink/pull/6748#discussion_r219883500
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -322,32 +320,20 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-val isNew = TestBaseUtils.isNewCodebase()
-if (isNew) {
-  configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
-  // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
-  configuration.setInteger(RestOptions.PORT, 8082)
-  val miniConfig = new MiniClusterConfiguration.Builder()
-.setConfiguration(configuration)
-.setNumSlotsPerTaskManager(parallelism)
-.build()
-
-  val miniCluster = new MiniCluster(miniConfig)
-  miniCluster.start()
-  port = miniCluster.getRestAddress.getPort
-  hostname = miniCluster.getRestAddress.getHost
-
-  cluster = Some(Left(miniCluster))
-} else {
-  configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-  configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-  val standaloneCluster = new StandaloneMiniCluster(configuration)
-
-  hostname = standaloneCluster.getHostname
-  port = standaloneCluster.getPort
-
-  cluster = Some(Right(standaloneCluster))
-}
+configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+// set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
+configuration.setInteger(RestOptions.PORT, 8082)
+val miniConfig = new MiniClusterConfiguration.Builder()
+  .setConfiguration(configuration)
+  .setNumSlotsPerTaskManager(parallelism)
+  .build()
+
+val miniCluster = new MiniCluster(miniConfig)
+miniCluster.start()
+port = miniCluster.getRestAddress.getPort
+hostname = miniCluster.getRestAddress.getHost
+
+cluster = Some(Left(miniCluster))
 
 Review comment:
   do we still need the `Either` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove codebase switch from MiniClusterResource
> ---
>
> Key: FLINK-10396
> URL: https://issues.apache.org/jira/browse/FLINK-10396
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: 
> 0001-FLINK-10396-Remove-codebase-switch-in-UT-IT-tests.patch
>
>
> Remove the legacy codebase switch from {{MiniClusterResource}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6748: [FLINK-10396] Remove CodebaseType

2018-09-24 Thread GitBox
zentol commented on a change in pull request #6748: [FLINK-10396] Remove 
CodebaseType
URL: https://github.com/apache/flink/pull/6748#discussion_r219883500
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -322,32 +320,20 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-val isNew = TestBaseUtils.isNewCodebase()
-if (isNew) {
-  configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
-  // set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
-  configuration.setInteger(RestOptions.PORT, 8082)
-  val miniConfig = new MiniClusterConfiguration.Builder()
-.setConfiguration(configuration)
-.setNumSlotsPerTaskManager(parallelism)
-.build()
-
-  val miniCluster = new MiniCluster(miniConfig)
-  miniCluster.start()
-  port = miniCluster.getRestAddress.getPort
-  hostname = miniCluster.getRestAddress.getHost
-
-  cluster = Some(Left(miniCluster))
-} else {
-  configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-  configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-  val standaloneCluster = new StandaloneMiniCluster(configuration)
-
-  hostname = standaloneCluster.getHostname
-  port = standaloneCluster.getPort
-
-  cluster = Some(Right(standaloneCluster))
-}
+configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+// set to different than default so not to interfere with 
ScalaShellLocalStartupITCase
+configuration.setInteger(RestOptions.PORT, 8082)
+val miniConfig = new MiniClusterConfiguration.Builder()
+  .setConfiguration(configuration)
+  .setNumSlotsPerTaskManager(parallelism)
+  .build()
+
+val miniCluster = new MiniCluster(miniConfig)
+miniCluster.start()
+port = miniCluster.getRestAddress.getPort
+hostname = miniCluster.getRestAddress.getHost
+
+cluster = Some(Left(miniCluster))
 
 Review comment:
   do we still need the `Either` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10397) Remove CoreOptions#MODE

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626034#comment-16626034
 ] 

ASF GitHub Bot commented on FLINK-10397:


tillrohrmann opened a new pull request #6752: [FLINK-10397] Remove 
CoreOptions#MODE
URL: https://github.com/apache/flink/pull/6752
 
 
   ## What is the purpose of the change
   
   This PR is based on #6748 and #6751 and is part of the [removal of Flink's 
legacy mode](https://issues.apache.org/jira/browse/FLINK-10392).
   
   Removes the CoreOptions#MODE option used to switch between the new and 
legacy mode.
   
   ## Brief change log
   
   - Remove `CoreOptions#MODE`, `CoreOptions#NEW` and `CoreOptions#LEGACY`
   - Set the mode where it was switchable to new (e.g. `isNewMode = true` in 
`YarnTestBase`)
   
   ## Verifying this change
   
   - Not explicitly tested
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove CoreOptions#MODE
> ---
>
> Key: FLINK-10397
> URL: https://issues.apache.org/jira/browse/FLINK-10397
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Remove the {{CoreOptions#MODE}} since it is no longer needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10397) Remove CoreOptions#MODE

2018-09-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10397:
---
Labels: pull-request-available  (was: )

> Remove CoreOptions#MODE
> ---
>
> Key: FLINK-10397
> URL: https://issues.apache.org/jira/browse/FLINK-10397
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Remove the {{CoreOptions#MODE}} since it is no longer needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #6752: [FLINK-10397] Remove CoreOptions#MODE

2018-09-24 Thread GitBox
tillrohrmann opened a new pull request #6752: [FLINK-10397] Remove 
CoreOptions#MODE
URL: https://github.com/apache/flink/pull/6752
 
 
   ## What is the purpose of the change
   
   This PR is based on #6748 and #6751 and is part of the [removal of Flink's 
legacy mode](https://issues.apache.org/jira/browse/FLINK-10392).
   
   Removes the CoreOptions#MODE option used to switch between the new and 
legacy mode.
   
   ## Brief change log
   
   - Remove `CoreOptions#MODE`, `CoreOptions#NEW` and `CoreOptions#LEGACY`
   - Set the mode where it was switchable to new (e.g. `isNewMode = true` in 
`YarnTestBase`)
   
   ## Verifying this change
   
   - Not explicitly tested
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >