[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-13 Thread Fred Teunissen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579276#comment-16579276
 ] 

Fred Teunissen commented on FLINK-8500:
---

I've rebased this PR with the latest master branch yesterday evening.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-13 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579269#comment-16579269
 ] 

Thomas Weise commented on FLINK-10074:
--

I think configuring the behavior as a count of allowable consecutive failures 
would work well. Would this replace the existing setFailOnCheckpointingErrors 
(will that setting become irrelevant when the user already sets the count)?

[https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html]

Regarding what happens once the job was allowed to fail and recovers only to 
fail again: Shouldn't the counter only be reset after the next successful 
checkpoint vs. on restart? 

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10133) finished job's jobgraph never been cleaned up in zookeeper for standalone clusters (HA mode with multiple masters)

2018-08-13 Thread Xiangyu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579248#comment-16579248
 ] 

Xiangyu Zhu commented on FLINK-10133:
-

[~Wosinsan] [~elevy] I have uploaded the logs with some sensitive info 
modified. If the log looks ok to you then this issue can be closed as 
duplicate. Thanks!

> finished job's jobgraph never been cleaned up in zookeeper for standalone 
> clusters (HA mode with multiple masters)
> --
>
> Key: FLINK-10133
> URL: https://issues.apache.org/jira/browse/FLINK-10133
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0, 1.5.2, 1.6.0
>Reporter: Xiangyu Zhu
>Priority: Major
> Attachments: client.log, namenode.log, standalonesession.log, 
> zookeeper.log
>
>
> Hi,
> We have 3 servers in our test environment, noted as node1-3. Setup is as 
> following:
>  * hadoop hdfs: node1 as namenode, node2,3 as datanode
>  * zookeeper: node1-3 as a quorum (but also tried node1 alone)
>  * flink: node1,2 as masters, node2,3 as slaves
> As my understanding when a job finished the corresponding job's blob data is 
> expected to be deleted from hdfs path and node under zookeeper's path `/\{zk 
> path root}/\{cluster-id}/jobgraphs/\{job id}` should be deleted after that. 
> However we observe that whenever we submitted a job and it finished (via 
> `bin/flink run WordCount.jar`), the blob data is gone whereas job id node 
> under zookeeper is still there, with a uuid style lock node inside it. From 
> the debug node in zookeeper we observed something like "cannot be deleted 
> because non empty". Because of this, as long as a job is finished and the 
> jobgraph node persists, if restart the clusters or kill one manager (to test 
> HA mode), it tries to recover a finished job and couldn't find blob data 
> under hdfs, and the whole cluster is down.
> If we use only node1 as master and node2,3 as slaves, the jobgraphs node can 
> be deleted successfully. If the jobgraphs is clean, killing one job manager 
> makes another stand-by JM raised as leader, so it is only this jobgraphs 
> issue preventing HA from working.
> I'm not sure if there's something wrong with our configs because this happens 
> every time for finished job (we only tested with wordcount.jar though). I'm 
> aware of FLINK-10011 and FLINK-10029, but unlike FLINK-10011 this happens 
> every time, rendering HA mode un-useable for us.
> Any idea what might cause this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10133) finished job's jobgraph never been cleaned up in zookeeper for standalone clusters (HA mode with multiple masters)

2018-08-13 Thread Xiangyu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangyu Zhu updated FLINK-10133:

Attachment: client.log
namenode.log
zookeeper.log
standalonesession.log

> finished job's jobgraph never been cleaned up in zookeeper for standalone 
> clusters (HA mode with multiple masters)
> --
>
> Key: FLINK-10133
> URL: https://issues.apache.org/jira/browse/FLINK-10133
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0, 1.5.2, 1.6.0
>Reporter: Xiangyu Zhu
>Priority: Major
> Attachments: client.log, namenode.log, standalonesession.log, 
> zookeeper.log
>
>
> Hi,
> We have 3 servers in our test environment, noted as node1-3. Setup is as 
> following:
>  * hadoop hdfs: node1 as namenode, node2,3 as datanode
>  * zookeeper: node1-3 as a quorum (but also tried node1 alone)
>  * flink: node1,2 as masters, node2,3 as slaves
> As my understanding when a job finished the corresponding job's blob data is 
> expected to be deleted from hdfs path and node under zookeeper's path `/\{zk 
> path root}/\{cluster-id}/jobgraphs/\{job id}` should be deleted after that. 
> However we observe that whenever we submitted a job and it finished (via 
> `bin/flink run WordCount.jar`), the blob data is gone whereas job id node 
> under zookeeper is still there, with a uuid style lock node inside it. From 
> the debug node in zookeeper we observed something like "cannot be deleted 
> because non empty". Because of this, as long as a job is finished and the 
> jobgraph node persists, if restart the clusters or kill one manager (to test 
> HA mode), it tries to recover a finished job and couldn't find blob data 
> under hdfs, and the whole cluster is down.
> If we use only node1 as master and node2,3 as slaves, the jobgraphs node can 
> be deleted successfully. If the jobgraphs is clean, killing one job manager 
> makes another stand-by JM raised as leader, so it is only this jobgraphs 
> issue preventing HA from working.
> I'm not sure if there's something wrong with our configs because this happens 
> every time for finished job (we only tested with wordcount.jar though). I'm 
> aware of FLINK-10011 and FLINK-10029, but unlike FLINK-10011 this happens 
> every time, rendering HA mode un-useable for us.
> Any idea what might cause this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6339: [FLINK-9859] [runtime] More Akka config

2018-08-13 Thread GitBox
TisonKun commented on issue #6339: [FLINK-9859] [runtime] More Akka config
URL: https://github.com/apache/flink/pull/6339#issuecomment-412743884
 
 
   ping @tillrohrmann  :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9859) More Akka config

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579207#comment-16579207
 ] 

ASF GitHub Bot commented on FLINK-9859:
---

TisonKun commented on issue #6339: [FLINK-9859] [runtime] More Akka config
URL: https://github.com/apache/flink/pull/6339#issuecomment-412743884
 
 
   ping @tillrohrmann  :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> More Akka config
> 
>
> Key: FLINK-9859
> URL: https://issues.apache.org/jira/browse/FLINK-9859
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.1
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3
>
>
> Add more akka config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-08-13 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10135:


Assignee: vinoyang

> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Major
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6437) Move history server configuration to a separate file

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579157#comment-16579157
 ] 

ASF GitHub Bot commented on FLINK-6437:
---

yanghua edited a comment on issue #6542: [FLINK-6437][History Server] Move 
history server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412736043
 
 
   hi @StephanEwen @zentol ,
   
   Currently, the history server runs as a single JVM process, which means it 
is a separate component. I think the configuration split is reasonable. If I 
only need to start the history server then I don't rely on other 
configurations, and I don't need to pay attention to other configurations.
   
   Regarding the discussion of compatibility, I basically agree with @zentol  
's point of view.
   
   Basic on the existing implementation, I think we can add more log warnings 
in the new method of `GlobalConfiguration`. The general idea is as follows:
   
   * Judge the new configuration file, if it does not exist, I will add a log 
to inform the user that he is using the old configuration;
   * If the new configuration and the old configuration about history server 
exist at the same time,  then we will warn the user that there are two 
configurations and we will base on the `flink-historyserver-conf.yaml`;
   
   In addition, I will give a log warning when the configuration file is loaded 
by the `HistoryServer#main` method, for example : 
   
   > the current configuration about history server in `flink-conf.yaml` is 
only for backward compatibility, it is recommended that they enable the new 
configuration file `flink-historyserver-conf.yaml`.
   
   In addition, I will comment out the configuration items in the 
`flink-historyserver-conf.yaml`, considering that we have done so many guides, 
I believe users will be more likely to accept the correct guidelines.
   
   Even if it is based on a configuration already in `flink-conf.yaml`, the 
current implementation will still work. But if we say that there are 
configurations in both files, we seem to have no better way than to give a 
warning "based on `flink-historyserver-conf.yaml`".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move history server configuration to a separate file
> 
>
> Key: FLINK-6437
> URL: https://issues.apache.org/jira/browse/FLINK-6437
> Project: Flink
>  Issue Type: Improvement
>  Components: History Server
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> I suggest to keep the {{flink-conf.yaml}} leaner by moving configuration of 
> the History Server to a different file.
> In general, I would propose to move configurations of separate, independent 
> and optional components to individual config files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua edited a comment on issue #6542: [FLINK-6437][History Server] Move history server configuration to a separate file

2018-08-13 Thread GitBox
yanghua edited a comment on issue #6542: [FLINK-6437][History Server] Move 
history server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412736043
 
 
   hi @StephanEwen @zentol ,
   
   Currently, the history server runs as a single JVM process, which means it 
is a separate component. I think the configuration split is reasonable. If I 
only need to start the history server then I don't rely on other 
configurations, and I don't need to pay attention to other configurations.
   
   Regarding the discussion of compatibility, I basically agree with @zentol  
's point of view.
   
   Basic on the existing implementation, I think we can add more log warnings 
in the new method of `GlobalConfiguration`. The general idea is as follows:
   
   * Judge the new configuration file, if it does not exist, I will add a log 
to inform the user that he is using the old configuration;
   * If the new configuration and the old configuration about history server 
exist at the same time,  then we will warn the user that there are two 
configurations and we will base on the `flink-historyserver-conf.yaml`;
   
   In addition, I will give a log warning when the configuration file is loaded 
by the `HistoryServer#main` method, for example : 
   
   > the current configuration about history server in `flink-conf.yaml` is 
only for backward compatibility, it is recommended that they enable the new 
configuration file `flink-historyserver-conf.yaml`.
   
   In addition, I will comment out the configuration items in the 
`flink-historyserver-conf.yaml`, considering that we have done so many guides, 
I believe users will be more likely to accept the correct guidelines.
   
   Even if it is based on a configuration already in `flink-conf.yaml`, the 
current implementation will still work. But if we say that there are 
configurations in both files, we seem to have no better way than to give a 
warning "based on `flink-historyserver-conf.yaml`".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-6437) Move history server configuration to a separate file

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579156#comment-16579156
 ] 

ASF GitHub Bot commented on FLINK-6437:
---

yanghua commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412736043
 
 
   hi @StephanEwen @zentol ,
   
   Currently, the history server runs as a standalone JVM process, which means 
it is a separate component. I think the configuration split is reasonable. If I 
only need to start the history server then I don't rely on other 
configurations, and I don't need to pay attention to other configurations.
   
   Regarding the discussion of compatibility, I basically agree with @zentol  
's point of view.
   
   Basic on the existing implementation, I think we can add more log warnings 
in the new method of `GlobalConfiguration`. The general idea is as follows:
   
   * Judge the new configuration file, if it does not exist, I will add a log 
to inform the user that he is using the old configuration;
   * If the new configuration and the old configuration about history server 
exist at the same time,  then we will warn the user that there are two 
configurations and we will base on the `flink-historyserver-conf.yaml`;
   
   In addition, I will give a log warning when the configuration file is loaded 
by the `HistoryServer#main` method, for example : 
   
   > the current configuration about history server in `flink-conf.yaml` is 
only for backward compatibility, it is recommended that they enable the new 
configuration file `flink-historyserver-conf.yaml`.
   
   In addition, I will comment out the configuration items in the 
`flink-historyserver-conf.yaml`, considering that we have done so many guides, 
I believe users will be more likely to accept the correct guidelines.
   
   Even if it is based on a configuration already in `flink-conf.yaml`, the 
current implementation will still work. But if we say that there are 
configurations in both files, we seem to have no better way than to give a 
warning "based on `flink-historyserver-conf.yaml`".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move history server configuration to a separate file
> 
>
> Key: FLINK-6437
> URL: https://issues.apache.org/jira/browse/FLINK-6437
> Project: Flink
>  Issue Type: Improvement
>  Components: History Server
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> I suggest to keep the {{flink-conf.yaml}} leaner by moving configuration of 
> the History Server to a different file.
> In general, I would propose to move configurations of separate, independent 
> and optional components to individual config files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6542: [FLINK-6437][History Server] Move history server configuration to a separate file

2018-08-13 Thread GitBox
yanghua commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412736043
 
 
   hi @StephanEwen @zentol ,
   
   Currently, the history server runs as a standalone JVM process, which means 
it is a separate component. I think the configuration split is reasonable. If I 
only need to start the history server then I don't rely on other 
configurations, and I don't need to pay attention to other configurations.
   
   Regarding the discussion of compatibility, I basically agree with @zentol  
's point of view.
   
   Basic on the existing implementation, I think we can add more log warnings 
in the new method of `GlobalConfiguration`. The general idea is as follows:
   
   * Judge the new configuration file, if it does not exist, I will add a log 
to inform the user that he is using the old configuration;
   * If the new configuration and the old configuration about history server 
exist at the same time,  then we will warn the user that there are two 
configurations and we will base on the `flink-historyserver-conf.yaml`;
   
   In addition, I will give a log warning when the configuration file is loaded 
by the `HistoryServer#main` method, for example : 
   
   > the current configuration about history server in `flink-conf.yaml` is 
only for backward compatibility, it is recommended that they enable the new 
configuration file `flink-historyserver-conf.yaml`.
   
   In addition, I will comment out the configuration items in the 
`flink-historyserver-conf.yaml`, considering that we have done so many guides, 
I believe users will be more likely to accept the correct guidelines.
   
   Even if it is based on a configuration already in `flink-conf.yaml`, the 
current implementation will still work. But if we say that there are 
configurations in both files, we seem to have no better way than to give a 
warning "based on `flink-historyserver-conf.yaml`".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xccui commented on issue #6535: [FLINK-9977] [table][doc] Refine the SQL/Table built-in function docs

2018-08-13 Thread GitBox
xccui commented on issue #6535: [FLINK-9977] [table][doc] Refine the SQL/Table 
built-in function docs
URL: https://github.com/apache/flink/pull/6535#issuecomment-412717935
 
 
   Thanks for the review, @fhueske. Will merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579094#comment-16579094
 ] 

ASF GitHub Bot commented on FLINK-9977:
---

xccui commented on issue #6535: [FLINK-9977] [table][doc] Refine the SQL/Table 
built-in function docs
URL: https://github.com/apache/flink/pull/6535#issuecomment-412717935
 
 
   Thanks for the review, @fhueske. Will merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
> Attachments: Java.jpg, SQL.jpg, Scala.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.
> Also, according to FLINK-10103, we should use single quotes to express 
> strings in SQL. For example, CONCAT("AA", "BB", "CC") should be replaced with 
> CONCAT('AA', 'BB', 'CC'). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-08-13 Thread Joey Echeverria (JIRA)
Joey Echeverria created FLINK-10135:
---

 Summary: The JobManager doesn't report the cluster-level metrics
 Key: FLINK-10135
 URL: https://issues.apache.org/jira/browse/FLINK-10135
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.5.0
Reporter: Joey Echeverria


In [the documentation for 
metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
 in the Flink 1.5.0 release, it says that the following metrics are reported by 
the JobManager:
{noformat}
numRegisteredTaskManagers
numRunningJobs
taskSlotsAvailable
taskSlotsTotal
{noformat}

In the job manager REST endpoint 
({{http://:8081/jobmanager/metrics}}), those metrics don't appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10134) UTF-16 support for TextInputFormat

2018-08-13 Thread David Dreyfus (JIRA)
David Dreyfus created FLINK-10134:
-

 Summary: UTF-16 support for TextInputFormat
 Key: FLINK-10134
 URL: https://issues.apache.org/jira/browse/FLINK-10134
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.2
Reporter: David Dreyfus


It does not appear that Flink supports a charset encoding of "UTF-16". It 
particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) to 
establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
 
TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
which sets TextInputFormat.charsetName and then modifies the previously set 
delimiterString to construct the proper byte string encoding of the the 
delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
to interpret the bytes read from the file.
 
There are two problems that this implementation would seem to have when using 
UTF-16.
 # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
actual text file will not contain a BOM at each line ending, so the delimiter 
will never be read. Moreover, if the actual byte encoding of the file is Little 
Endian, the bytes will be interpreted incorrectly.
 # TextInputFormat.readRecord() will not see a BOM each time it decodes a byte 
sequence with the String(bytes, offset, numBytes, charset) call. Therefore, it 
will assume Big Endian, which may not always be correct. [1] 
[https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]

 
While there are likely many solutions, I would think that all of them would 
have to start by reading the BOM from the file when a Split is opened and then 
using that BOM to modify the specified encoding to a BOM specific one when the 
caller doesn't specify one, and to overwrite the caller's specification if the 
BOM is in conflict with the caller's specification. That is, if the BOM 
indicates Little Endian and the caller indicates UTF-16BE, Flink should rewrite 
the charsetName as UTF-16LE.
 I hope this makes sense and that I haven't been testing incorrectly or 
misreading the code.
 
I've verified the problem on version 1.4.2. I believe the problem exists on all 
versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10127) Add TypeInformation and serializers for JDK8 Instant

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578935#comment-16578935
 ] 

ASF GitHub Bot commented on FLINK-10127:


alexeyt820 opened a new pull request #6549: [FLINK-10127] Add Instant to basic 
types
URL: https://github.com/apache/flink/pull/6549
 
 
   ## What is the purpose of the change
   This pull requests add JDK8 Instant type as basic type to Flink type system
   ## Brief change log
 - *InstantSerializer* added 
 - *InstantComparator* added
 - *BasicTypeInfo* modified to include *INSTANT_TYPE_INFO*
 - "Types" modified to include *INSTANT*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - *Added unit tests for InstantSerializer and InstantComparator*
   - *Modified BasicTypeInfoTest to include Instant*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive):  don't know
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#flinks-typeinformation-class
 should be modified to include Instant  
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add TypeInformation  and serializers for JDK8 Instant
> -
>
> Key: FLINK-10127
> URL: https://issues.apache.org/jira/browse/FLINK-10127
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexey Trenikhin
>Priority: Minor
>  Labels: pull-request-available
>
> Currently Flink's basic types include all Java primitives and their boxed 
> form, plus {{void}}, {{String}}, {{Date}}, {{BigDecimal}}, and 
> {{BigInteger}}. New JDK8 Instance type should be added as well



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10127) Add TypeInformation and serializers for JDK8 Instant

2018-08-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10127:
---
Labels: pull-request-available  (was: )

> Add TypeInformation  and serializers for JDK8 Instant
> -
>
> Key: FLINK-10127
> URL: https://issues.apache.org/jira/browse/FLINK-10127
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexey Trenikhin
>Priority: Minor
>  Labels: pull-request-available
>
> Currently Flink's basic types include all Java primitives and their boxed 
> form, plus {{void}}, {{String}}, {{Date}}, {{BigDecimal}}, and 
> {{BigInteger}}. New JDK8 Instance type should be added as well



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 opened a new pull request #6549: [FLINK-10127] Add Instant to basic types

2018-08-13 Thread GitBox
alexeyt820 opened a new pull request #6549: [FLINK-10127] Add Instant to basic 
types
URL: https://github.com/apache/flink/pull/6549
 
 
   ## What is the purpose of the change
   This pull requests add JDK8 Instant type as basic type to Flink type system
   ## Brief change log
 - *InstantSerializer* added 
 - *InstantComparator* added
 - *BasicTypeInfo* modified to include *INSTANT_TYPE_INFO*
 - "Types" modified to include *INSTANT*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - *Added unit tests for InstantSerializer and InstantComparator*
   - *Modified BasicTypeInfoTest to include Instant*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive):  don't know
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#flinks-typeinformation-class
 should be modified to include Instant  
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10019) Fix Composite getResultType of UDF cannot be chained with other operators

2018-08-13 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578793#comment-16578793
 ] 

Rong Rong edited comment on FLINK-10019 at 8/13/18 7:00 PM:


Dug a little deeper in Calcite and found that in:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68

Seems like if the return type is inferred as a record, or {{isStruct()}}, "it 
must have the same number of fields as the number of operands." which clearly 
is not the case here since the following expression: **{{AS(func(a), 
"myRow")}}** only passes over the **{{func(a)}}** for type inference, but not 
the alias **{{"myRow"}}**


was (Author: walterddr):
Dug a little deeper in Calcite and found that in:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68

Seems like if the return type is inferred as a record, or {{isStruct()}}, "it 
must have the same number of fields as the number of operands." which clearly 
is not the case here since the following expression: {{AS(func(a), "myRow")}} 
only passes over the {{func(a)}} for type inference, but not the alias 
{{"myRow"}}

> Fix Composite getResultType of UDF cannot be chained with other operators
> -
>
> Key: FLINK-10019
> URL: https://issues.apache.org/jira/browse/FLINK-10019
> Project: Flink
>  Issue Type: Bug
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> If explicitly return a CompositeType in {{udf.getResultType}}, will result in 
> some failures in chained operators.
> For example: consider a simple UDF,
> {code:scala}
> object Func extends ScalarFunction {
>   def eval(row: Row): Row = {
> row
>   }
>   override def getParameterTypes(signature: Array[Class[_]]): 
> Array[TypeInformation[_]] =
> Array(Types.ROW(Types.INT))
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
> Types.ROW(Types.INT)
> }
> {code}
> This should work perfectly since it's just a simple pass through, however
> {code:scala}
>   @Test
>   def testRowType(): Unit = {
> val data = List(
>   Row.of(Row.of(12.asInstanceOf[Integer]), "1")
> )
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), 
> Types.STRING))
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val table = stream.toTable(tEnv, 'a, 'b)
> tEnv.registerFunction("func", Func)
> tEnv.registerTable("t", table)
> // This works perfectly
> val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
> result1.addSink(new StreamITCase.StringSink[Row])
> // This throws exception
> val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM 
> t").toAppendStream[Row]
> result2.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
> {code}
> Exception code:
> {code:java}
> java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
>   at 
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
>   at 
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
>   at 
> com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
>   at 
> org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
> ...
> {code}
> This is due to the fact that Calcite inferOperandTypes does not expect to 
> infer a struct RelDataType.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10019) Fix Composite getResultType of UDF cannot be chained with other operators

2018-08-13 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578793#comment-16578793
 ] 

Rong Rong commented on FLINK-10019:
---

Dug a little deeper in Calcite and found that in:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68

Seems like if the return type is inferred as a record, or {{isStruct()}}, "it 
must have the same number of fields as the number of operands." which clearly 
is not the case here since the following expression: {{AS(func(a), "myRow")}} 
only passes over the {{func(a)}} for type inference, but not the alias 
{{"myRow"}}

> Fix Composite getResultType of UDF cannot be chained with other operators
> -
>
> Key: FLINK-10019
> URL: https://issues.apache.org/jira/browse/FLINK-10019
> Project: Flink
>  Issue Type: Bug
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> If explicitly return a CompositeType in {{udf.getResultType}}, will result in 
> some failures in chained operators.
> For example: consider a simple UDF,
> {code:scala}
> object Func extends ScalarFunction {
>   def eval(row: Row): Row = {
> row
>   }
>   override def getParameterTypes(signature: Array[Class[_]]): 
> Array[TypeInformation[_]] =
> Array(Types.ROW(Types.INT))
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
> Types.ROW(Types.INT)
> }
> {code}
> This should work perfectly since it's just a simple pass through, however
> {code:scala}
>   @Test
>   def testRowType(): Unit = {
> val data = List(
>   Row.of(Row.of(12.asInstanceOf[Integer]), "1")
> )
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), 
> Types.STRING))
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val table = stream.toTable(tEnv, 'a, 'b)
> tEnv.registerFunction("func", Func)
> tEnv.registerTable("t", table)
> // This works perfectly
> val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
> result1.addSink(new StreamITCase.StringSink[Row])
> // This throws exception
> val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM 
> t").toAppendStream[Row]
> result2.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
> {code}
> Exception code:
> {code:java}
> java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
>   at 
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
>   at 
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
>   at 
> com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
>   at 
> org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
> ...
> {code}
> This is due to the fact that Calcite inferOperandTypes does not expect to 
> infer a struct RelDataType.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10133) finished job's jobgraph never been cleaned up in zookeeper for standalone clusters (HA mode with multiple masters)

2018-08-13 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578792#comment-16578792
 ] 

Elias Levy commented on FLINK-10133:


[~Frefreak] this is likely the same issue as FLINK-10011.  If so, mark this one 
as a duplicate.

> finished job's jobgraph never been cleaned up in zookeeper for standalone 
> clusters (HA mode with multiple masters)
> --
>
> Key: FLINK-10133
> URL: https://issues.apache.org/jira/browse/FLINK-10133
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0, 1.5.2, 1.6.0
>Reporter: Xiangyu Zhu
>Priority: Major
>
> Hi,
> We have 3 servers in our test environment, noted as node1-3. Setup is as 
> following:
>  * hadoop hdfs: node1 as namenode, node2,3 as datanode
>  * zookeeper: node1-3 as a quorum (but also tried node1 alone)
>  * flink: node1,2 as masters, node2,3 as slaves
> As my understanding when a job finished the corresponding job's blob data is 
> expected to be deleted from hdfs path and node under zookeeper's path `/\{zk 
> path root}/\{cluster-id}/jobgraphs/\{job id}` should be deleted after that. 
> However we observe that whenever we submitted a job and it finished (via 
> `bin/flink run WordCount.jar`), the blob data is gone whereas job id node 
> under zookeeper is still there, with a uuid style lock node inside it. From 
> the debug node in zookeeper we observed something like "cannot be deleted 
> because non empty". Because of this, as long as a job is finished and the 
> jobgraph node persists, if restart the clusters or kill one manager (to test 
> HA mode), it tries to recover a finished job and couldn't find blob data 
> under hdfs, and the whole cluster is down.
> If we use only node1 as master and node2,3 as slaves, the jobgraphs node can 
> be deleted successfully. If the jobgraphs is clean, killing one job manager 
> makes another stand-by JM raised as leader, so it is only this jobgraphs 
> issue preventing HA from working.
> I'm not sure if there's something wrong with our configs because this happens 
> every time for finished job (we only tested with wordcount.jar though). I'm 
> aware of FLINK-10011 and FLINK-10029, but unlike FLINK-10011 this happens 
> every time, rendering HA mode un-useable for us.
> Any idea what might cause this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578781#comment-16578781
 ] 

ASF GitHub Bot commented on FLINK-:
---

walterddr commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209719307
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,63 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testIsNumeric(): Unit = {
 
 Review comment:
   Originally what I meant is, since this function only supports 
string/varchar, let's have a test that specifies `ISNUMERIC(1L)` throws 
`ValidationException`. 
   
   Regarding the usage of this in general, I think this is useful to chained 
with many other operators with strict type constrains. such as `CASE WHEN 
ISNUMERIC(...) THEN ... ELSE ...`, where the `THEN` clause requires some strict 
numeric vaues. That's why I was wondering if we should do a better support 
beyond just STRING/VARCHAR type. 
   
   I will comment on the JIRA actually. thanks for bring this up. 
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ISNUMERIC supported in Table API/SQL
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> ISNUMERIC function used to verify a expression is a valid numberic type.
> documentation : 
> https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #6473: [FLINK-9999] [table] Add ISNUMERIC supported in Table API/SQL

2018-08-13 Thread GitBox
walterddr commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209719307
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,63 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testIsNumeric(): Unit = {
 
 Review comment:
   Originally what I meant is, since this function only supports 
string/varchar, let's have a test that specifies `ISNUMERIC(1L)` throws 
`ValidationException`. 
   
   Regarding the usage of this in general, I think this is useful to chained 
with many other operators with strict type constrains. such as `CASE WHEN 
ISNUMERIC(...) THEN ... ELSE ...`, where the `THEN` clause requires some strict 
numeric vaues. That's why I was wondering if we should do a better support 
beyond just STRING/VARCHAR type. 
   
   I will comment on the JIRA actually. thanks for bring this up. 
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-6437) Move history server configuration to a separate file

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578778#comment-16578778
 ] 

ASF GitHub Bot commented on FLINK-6437:
---

zentol commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412622673
 
 
   well I still like the idea of separate config files, but the JIRA discussion 
happened
   more than a year ago, _before we even had released the HistoryServer_. Now 
we have to think about backwards compatibility and will thus naturally end up 
adding complexity. I'm not sure if this is really worth it, especially so since 
this issue has never been raised again since the HS was released.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move history server configuration to a separate file
> 
>
> Key: FLINK-6437
> URL: https://issues.apache.org/jira/browse/FLINK-6437
> Project: Flink
>  Issue Type: Improvement
>  Components: History Server
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> I suggest to keep the {{flink-conf.yaml}} leaner by moving configuration of 
> the History Server to a different file.
> In general, I would propose to move configurations of separate, independent 
> and optional components to individual config files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6542: [FLINK-6437][History Server] Move history server configuration to a separate file

2018-08-13 Thread GitBox
zentol commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412622673
 
 
   well I still like the idea of separate config files, but the JIRA discussion 
happened
   more than a year ago, _before we even had released the HistoryServer_. Now 
we have to think about backwards compatibility and will thus naturally end up 
adding complexity. I'm not sure if this is really worth it, especially so since 
this issue has never been raised again since the HS was released.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-6437) Move history server configuration to a separate file

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578759#comment-16578759
 ] 

ASF GitHub Bot commented on FLINK-6437:
---

StephanEwen commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412618428
 
 
   Sure, so the compatibility story needs a more detailed design, and that 
needs to be discussed before looking at concrete code. Agreed.
   
   On whether we want this feature or not - the discussion in Jira was in favor 
of doing this. Curious what is the reasoning for the push back now, @zentol ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move history server configuration to a separate file
> 
>
> Key: FLINK-6437
> URL: https://issues.apache.org/jira/browse/FLINK-6437
> Project: Flink
>  Issue Type: Improvement
>  Components: History Server
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> I suggest to keep the {{flink-conf.yaml}} leaner by moving configuration of 
> the History Server to a different file.
> In general, I would propose to move configurations of separate, independent 
> and optional components to individual config files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bowenli86 commented on a change in pull request #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-13 Thread GitBox
bowenli86 commented on a change in pull request #6544: [FLINK-8532] [Streaming] 
modify RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#discussion_r209712274
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
 ##
 @@ -228,9 +229,29 @@ private static void 
verifyRebalancePartitioning(List> re
new Tuple2(2, "c"),
new Tuple2(0, "a"));
 
-   assertEquals(
-   new HashSet>(expected),
-   new HashSet>(rebalancePartitionResult));
+   List> expected1 = Arrays.asList(
+   new Tuple2(1, "a"),
+   new Tuple2(2, "b"),
+   new Tuple2(0, "b"),
+   new Tuple2(1, "a"),
+   new Tuple2(2, "a"),
+   new Tuple2(0, "c"),
+   new Tuple2(1, "a"));
+
+   List> expected2 = Arrays.asList(
+   new Tuple2(2, "a"),
+   new Tuple2(0, "b"),
+   new Tuple2(1, "b"),
+   new Tuple2(2, "a"),
+   new Tuple2(0, "a"),
+   new Tuple2(1, "c"),
+   new Tuple2(2, "a"));
+
+   assertTrue(
 
 Review comment:
   to be more robust for people to modify `expectedX`, shall this assert that 
exactly one case match and the other two unmatch? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578760#comment-16578760
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

bowenli86 commented on a change in pull request #6544: [FLINK-8532] [Streaming] 
modify RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#discussion_r209712274
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
 ##
 @@ -228,9 +229,29 @@ private static void 
verifyRebalancePartitioning(List> re
new Tuple2(2, "c"),
new Tuple2(0, "a"));
 
-   assertEquals(
-   new HashSet>(expected),
-   new HashSet>(rebalancePartitionResult));
+   List> expected1 = Arrays.asList(
+   new Tuple2(1, "a"),
+   new Tuple2(2, "b"),
+   new Tuple2(0, "b"),
+   new Tuple2(1, "a"),
+   new Tuple2(2, "a"),
+   new Tuple2(0, "c"),
+   new Tuple2(1, "a"));
+
+   List> expected2 = Arrays.asList(
+   new Tuple2(2, "a"),
+   new Tuple2(0, "b"),
+   new Tuple2(1, "b"),
+   new Tuple2(2, "a"),
+   new Tuple2(0, "a"),
+   new Tuple2(1, "c"),
+   new Tuple2(2, "a"));
+
+   assertTrue(
 
 Review comment:
   to be more robust for people to modify `expectedX`, shall this assert that 
exactly one case match and the other two unmatch? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Priority: Minor
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578757#comment-16578757
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

bowenli86 commented on a change in pull request #6544: [FLINK-8532] [Streaming] 
modify RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#discussion_r209712274
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
 ##
 @@ -228,9 +229,29 @@ private static void 
verifyRebalancePartitioning(List> re
new Tuple2(2, "c"),
new Tuple2(0, "a"));
 
-   assertEquals(
-   new HashSet>(expected),
-   new HashSet>(rebalancePartitionResult));
+   List> expected1 = Arrays.asList(
+   new Tuple2(1, "a"),
+   new Tuple2(2, "b"),
+   new Tuple2(0, "b"),
+   new Tuple2(1, "a"),
+   new Tuple2(2, "a"),
+   new Tuple2(0, "c"),
+   new Tuple2(1, "a"));
+
+   List> expected2 = Arrays.asList(
+   new Tuple2(2, "a"),
+   new Tuple2(0, "b"),
+   new Tuple2(1, "b"),
+   new Tuple2(2, "a"),
+   new Tuple2(0, "a"),
+   new Tuple2(1, "c"),
+   new Tuple2(2, "a"));
+
+   assertTrue(
 
 Review comment:
   to be more robust, shall this assert that exactly one case match and the 
other two unmatch? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Priority: Minor
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6542: [FLINK-6437][History Server] Move history server configuration to a separate file

2018-08-13 Thread GitBox
StephanEwen commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412618428
 
 
   Sure, so the compatibility story needs a more detailed design, and that 
needs to be discussed before looking at concrete code. Agreed.
   
   On whether we want this feature or not - the discussion in Jira was in favor 
of doing this. Curious what is the reasoning for the push back now, @zentol ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 commented on a change in pull request #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-13 Thread GitBox
bowenli86 commented on a change in pull request #6544: [FLINK-8532] [Streaming] 
modify RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#discussion_r209712274
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
 ##
 @@ -228,9 +229,29 @@ private static void 
verifyRebalancePartitioning(List> re
new Tuple2(2, "c"),
new Tuple2(0, "a"));
 
-   assertEquals(
-   new HashSet>(expected),
-   new HashSet>(rebalancePartitionResult));
+   List> expected1 = Arrays.asList(
+   new Tuple2(1, "a"),
+   new Tuple2(2, "b"),
+   new Tuple2(0, "b"),
+   new Tuple2(1, "a"),
+   new Tuple2(2, "a"),
+   new Tuple2(0, "c"),
+   new Tuple2(1, "a"));
+
+   List> expected2 = Arrays.asList(
+   new Tuple2(2, "a"),
+   new Tuple2(0, "b"),
+   new Tuple2(1, "b"),
+   new Tuple2(2, "a"),
+   new Tuple2(0, "a"),
+   new Tuple2(1, "c"),
+   new Tuple2(2, "a"));
+
+   assertTrue(
 
 Review comment:
   to be more robust, shall this assert that exactly one case match and the 
other two unmatch? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10066) Keep only archived version of previous executions

2018-08-13 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-10066.
--
Resolution: Fixed

Merged in:
master: 160dc56fdf
release-1.6: 74323d50b0
release-1.5: 2217c09c88

> Keep only archived version of previous executions
> -
>
> Key: FLINK-10066
> URL: https://issues.apache.org/jira/browse/FLINK-10066
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.4.3, 1.5.2, 1.6.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the execution vertex stores a limited amount of previous 
> executions in a bounded list. This happens primarily for archiving purposes 
> and to remember previous locations and allocation ids. We remember the whole 
> execution to eventually convert it into an archived execution.
> This seems unnecessary and dangerous as we have observed that this strategy 
> is prone to memory leaks in the job manager. With a very high vertex count or 
> parallelism, remembering complete executions can become very memory 
> intensive. Instead I suggest to eagerly transform the executions into the 
> archived version before adding them to the list, i.e. only the archived 
> version is ever still referenced after the execution becomes obsolete. This 
> gives better control over which information about the execution should really 
> be kept in memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10066) Keep only archived version of previous executions

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578712#comment-16578712
 ] 

ASF GitHub Bot commented on FLINK-10066:


asfgit closed pull request #6500: [FLINK-10066] Keep only archived version of 
previous executions
URL: https://github.com/apache/flink/pull/6500
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 4b1c62fe707..ab8c94c0188 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -40,6 +41,8 @@
 
private final TaskManagerLocation assignedResourceLocation; // for the 
archived execution
 
+   private final AllocationID assignedAllocationID;
+
/* Continuously updated map of user-defined accumulators */
private final StringifiedAccumulatorResult[] userAccumulators;
 
@@ -48,21 +51,24 @@
private final IOMetrics ioMetrics;
 
public ArchivedExecution(Execution execution) {
-   this.userAccumulators = 
execution.getUserAccumulatorsStringified();
-   this.attemptId = execution.getAttemptId();
-   this.attemptNumber = execution.getAttemptNumber();
-   this.stateTimestamps = execution.getStateTimestamps();
-   this.parallelSubtaskIndex = 
execution.getVertex().getParallelSubtaskIndex();
-   this.state = execution.getState();
-   this.failureCause = 
ExceptionUtils.stringifyException(execution.getFailureCause());
-   this.assignedResourceLocation = 
execution.getAssignedResourceLocation();
-   this.ioMetrics = execution.getIOMetrics();
+   this(
+   execution.getUserAccumulatorsStringified(),
+   execution.getIOMetrics(),
+   execution.getAttemptId(),
+   execution.getAttemptNumber(),
+   execution.getState(),
+   
ExceptionUtils.stringifyException(execution.getFailureCause()),
+   execution.getAssignedResourceLocation(),
+   execution.getAssignedAllocationID(),
+   execution.getVertex().getParallelSubtaskIndex(),
+   execution.getStateTimestamps());
}
 
public ArchivedExecution(
StringifiedAccumulatorResult[] userAccumulators, 
IOMetrics ioMetrics,
ExecutionAttemptID attemptId, int attemptNumber, 
ExecutionState state, String failureCause,
-   TaskManagerLocation assignedResourceLocation, int 
parallelSubtaskIndex, long[] stateTimestamps) {
+   TaskManagerLocation assignedResourceLocation, 
AllocationID assignedAllocationID,  int parallelSubtaskIndex,
+   long[] stateTimestamps) {
this.userAccumulators = userAccumulators;
this.ioMetrics = ioMetrics;
this.failureCause = failureCause;
@@ -72,6 +78,7 @@ public ArchivedExecution(
this.state = state;
this.stateTimestamps = stateTimestamps;
this.parallelSubtaskIndex = parallelSubtaskIndex;
+   this.assignedAllocationID = assignedAllocationID;
}
 
// 

@@ -103,6 +110,10 @@ public TaskManagerLocation getAssignedResourceLocation() {
return assignedResourceLocation;
}
 
+   public AllocationID getAssignedAllocationID() {
+   return assignedAllocationID;
+   }
+
@Override
public String getFailureCauseAsString() {
return failureCause;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 36669d34be7..04efa048fb6 100644
--- 

[GitHub] asfgit closed pull request #6500: [FLINK-10066] Keep only archived version of previous executions

2018-08-13 Thread GitBox
asfgit closed pull request #6500: [FLINK-10066] Keep only archived version of 
previous executions
URL: https://github.com/apache/flink/pull/6500
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 4b1c62fe707..ab8c94c0188 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -40,6 +41,8 @@
 
private final TaskManagerLocation assignedResourceLocation; // for the 
archived execution
 
+   private final AllocationID assignedAllocationID;
+
/* Continuously updated map of user-defined accumulators */
private final StringifiedAccumulatorResult[] userAccumulators;
 
@@ -48,21 +51,24 @@
private final IOMetrics ioMetrics;
 
public ArchivedExecution(Execution execution) {
-   this.userAccumulators = 
execution.getUserAccumulatorsStringified();
-   this.attemptId = execution.getAttemptId();
-   this.attemptNumber = execution.getAttemptNumber();
-   this.stateTimestamps = execution.getStateTimestamps();
-   this.parallelSubtaskIndex = 
execution.getVertex().getParallelSubtaskIndex();
-   this.state = execution.getState();
-   this.failureCause = 
ExceptionUtils.stringifyException(execution.getFailureCause());
-   this.assignedResourceLocation = 
execution.getAssignedResourceLocation();
-   this.ioMetrics = execution.getIOMetrics();
+   this(
+   execution.getUserAccumulatorsStringified(),
+   execution.getIOMetrics(),
+   execution.getAttemptId(),
+   execution.getAttemptNumber(),
+   execution.getState(),
+   
ExceptionUtils.stringifyException(execution.getFailureCause()),
+   execution.getAssignedResourceLocation(),
+   execution.getAssignedAllocationID(),
+   execution.getVertex().getParallelSubtaskIndex(),
+   execution.getStateTimestamps());
}
 
public ArchivedExecution(
StringifiedAccumulatorResult[] userAccumulators, 
IOMetrics ioMetrics,
ExecutionAttemptID attemptId, int attemptNumber, 
ExecutionState state, String failureCause,
-   TaskManagerLocation assignedResourceLocation, int 
parallelSubtaskIndex, long[] stateTimestamps) {
+   TaskManagerLocation assignedResourceLocation, 
AllocationID assignedAllocationID,  int parallelSubtaskIndex,
+   long[] stateTimestamps) {
this.userAccumulators = userAccumulators;
this.ioMetrics = ioMetrics;
this.failureCause = failureCause;
@@ -72,6 +78,7 @@ public ArchivedExecution(
this.state = state;
this.stateTimestamps = stateTimestamps;
this.parallelSubtaskIndex = parallelSubtaskIndex;
+   this.assignedAllocationID = assignedAllocationID;
}
 
// 

@@ -103,6 +110,10 @@ public TaskManagerLocation getAssignedResourceLocation() {
return assignedResourceLocation;
}
 
+   public AllocationID getAssignedAllocationID() {
+   return assignedAllocationID;
+   }
+
@Override
public String getFailureCauseAsString() {
return failureCause;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 36669d34be7..04efa048fb6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -40,7 +40,7 @@
 
public ArchivedExecutionVertex(ExecutionVertex vertex) 

[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-13 Thread GitBox
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-412602068
 
 
   @yanghua thanks for your suggestion, I have made an update. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578678#comment-16578678
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-412602068
 
 
   @yanghua thanks for your suggestion, I have made an update. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Priority: Minor
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9878) IO worker threads BLOCKED on SSL Session Cache while CMS full gc

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578653#comment-16578653
 ] 

ASF GitHub Bot commented on FLINK-9878:
---

NicoK commented on issue #6355: [FLINK-9878][network][ssl] add more low-level 
ssl options
URL: https://github.com/apache/flink/pull/6355#issuecomment-412597634
 
 
   I pushed a rework of this PR which has a lighter footprint on the changes in 
SSLUtils by using a wrapper around `SSLContext` as @pnowojski suggested.
   
   I kept all existing logic though, including the `@Nullable` fields (vs. 
`Optional`) for these reasons:
   1) there are already conflicts when applying this to `release-1.6` and I'd 
like to keep the footprint small (some of the suggestions already make the diff 
bigger)
   2) there are several `null` checks which would need refactoring
   3) this seems to be out of scope of this PR, especially since no nullable 
field is added (any more)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IO worker threads BLOCKED on SSL Session Cache while CMS full gc
> 
>
> Key: FLINK-9878
> URL: https://issues.apache.org/jira/browse/FLINK-9878
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.1, 1.7.0
>
>
> According to https://github.com/netty/netty/issues/832, there is a JDK issue 
> during garbage collection when the SSL session cache is not limited. We 
> should allow the user to configure this and further (advanced) SSL parameters 
> for fine-tuning to fix this and similar issues. In particular, the following 
> parameters should be configurable:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on issue #6355: [FLINK-9878][network][ssl] add more low-level ssl options

2018-08-13 Thread GitBox
NicoK commented on issue #6355: [FLINK-9878][network][ssl] add more low-level 
ssl options
URL: https://github.com/apache/flink/pull/6355#issuecomment-412597634
 
 
   I pushed a rework of this PR which has a lighter footprint on the changes in 
SSLUtils by using a wrapper around `SSLContext` as @pnowojski suggested.
   
   I kept all existing logic though, including the `@Nullable` fields (vs. 
`Optional`) for these reasons:
   1) there are already conflicts when applying this to `release-1.6` and I'd 
like to keep the footprint small (some of the suggestions already make the diff 
bigger)
   2) there are several `null` checks which would need refactoring
   3) this seems to be out of scope of this PR, especially since no nullable 
field is added (any more)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9878) IO worker threads BLOCKED on SSL Session Cache while CMS full gc

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578646#comment-16578646
 ] 

ASF GitHub Bot commented on FLINK-9878:
---

NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209690587
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 ##
 @@ -175,7 +183,6 @@ ChannelFuture connect(final InetSocketAddress 
serverSocketAddress) {
bootstrap.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel channel) throws 
Exception {
-
// SSL handler should be added first in the 
pipeline
if (clientSSLContext != null) {
 
 Review comment:
   if SSL is disabled, for example


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IO worker threads BLOCKED on SSL Session Cache while CMS full gc
> 
>
> Key: FLINK-9878
> URL: https://issues.apache.org/jira/browse/FLINK-9878
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.1, 1.7.0
>
>
> According to https://github.com/netty/netty/issues/832, there is a JDK issue 
> during garbage collection when the SSL session cache is not limited. We 
> should allow the user to configure this and further (advanced) SSL parameters 
> for fine-tuning to fix this and similar issues. In particular, the following 
> parameters should be configurable:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl options

2018-08-13 Thread GitBox
NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209690587
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 ##
 @@ -175,7 +183,6 @@ ChannelFuture connect(final InetSocketAddress 
serverSocketAddress) {
bootstrap.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel channel) throws 
Exception {
-
// SSL handler should be added first in the 
pipeline
if (clientSSLContext != null) {
 
 Review comment:
   if SSL is disabled, for example


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9878) IO worker threads BLOCKED on SSL Session Cache while CMS full gc

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578643#comment-16578643
 ] 

ASF GitHub Bot commented on FLINK-9878:
---

NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209690309
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 ##
 @@ -52,6 +56,9 @@
 
private Bootstrap bootstrap;
 
 Review comment:
   out of scope of this PR - there's also more around this package, if you 
wanted to mark/change these accordingly


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IO worker threads BLOCKED on SSL Session Cache while CMS full gc
> 
>
> Key: FLINK-9878
> URL: https://issues.apache.org/jira/browse/FLINK-9878
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.1, 1.7.0
>
>
> According to https://github.com/netty/netty/issues/832, there is a JDK issue 
> during garbage collection when the SSL session cache is not limited. We 
> should allow the user to configure this and further (advanced) SSL parameters 
> for fine-tuning to fix this and similar issues. In particular, the following 
> parameters should be configurable:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9878) IO worker threads BLOCKED on SSL Session Cache while CMS full gc

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578644#comment-16578644
 ] 

ASF GitHub Bot commented on FLINK-9878:
---

NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209690309
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 ##
 @@ -52,6 +56,9 @@
 
private Bootstrap bootstrap;
 
 Review comment:
   out of scope of this PR - there's also even more around this package, if you 
wanted to mark/change these accordingly


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IO worker threads BLOCKED on SSL Session Cache while CMS full gc
> 
>
> Key: FLINK-9878
> URL: https://issues.apache.org/jira/browse/FLINK-9878
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.1, 1.7.0
>
>
> According to https://github.com/netty/netty/issues/832, there is a JDK issue 
> during garbage collection when the SSL session cache is not limited. We 
> should allow the user to configure this and further (advanced) SSL parameters 
> for fine-tuning to fix this and similar issues. In particular, the following 
> parameters should be configurable:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl options

2018-08-13 Thread GitBox
NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209690309
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 ##
 @@ -52,6 +56,9 @@
 
private Bootstrap bootstrap;
 
 Review comment:
   out of scope of this PR - there's also even more around this package, if you 
wanted to mark/change these accordingly


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl options

2018-08-13 Thread GitBox
NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209690309
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 ##
 @@ -52,6 +56,9 @@
 
private Bootstrap bootstrap;
 
 Review comment:
   out of scope of this PR - there's also more around this package, if you 
wanted to mark/change these accordingly


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578630#comment-16578630
 ] 

ASF GitHub Bot commented on FLINK-9977:
---

fhueske commented on issue #6535: [FLINK-9977] [table][doc] Refine the 
SQL/Table built-in function docs
URL: https://github.com/apache/flink/pull/6535#issuecomment-412593462
 
 
   Thanks for the update @xccui.
   
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
> Attachments: Java.jpg, SQL.jpg, Scala.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.
> Also, according to FLINK-10103, we should use single quotes to express 
> strings in SQL. For example, CONCAT("AA", "BB", "CC") should be replaced with 
> CONCAT('AA', 'BB', 'CC'). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9878) IO worker threads BLOCKED on SSL Session Cache while CMS full gc

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578629#comment-16578629
 ] 

ASF GitHub Bot commented on FLINK-9878:
---

NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209682805
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -163,80 +163,188 @@ public static void setSSLVerifyHostname(Configuration 
sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* Configuration settings and key/trustmanager instances to set up an 
SSL client connection.
+*/
+   public static class SSLClientConfiguration {
 
 Review comment:
   good idea - that makes the change even smaller...well, at least the 
important parts of the change ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IO worker threads BLOCKED on SSL Session Cache while CMS full gc
> 
>
> Key: FLINK-9878
> URL: https://issues.apache.org/jira/browse/FLINK-9878
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.1, 1.7.0
>
>
> According to https://github.com/netty/netty/issues/832, there is a JDK issue 
> during garbage collection when the SSL session cache is not limited. We 
> should allow the user to configure this and further (advanced) SSL parameters 
> for fine-tuning to fix this and similar issues. In particular, the following 
> parameters should be configurable:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl options

2018-08-13 Thread GitBox
NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209682805
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -163,80 +163,188 @@ public static void setSSLVerifyHostname(Configuration 
sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* Configuration settings and key/trustmanager instances to set up an 
SSL client connection.
+*/
+   public static class SSLClientConfiguration {
 
 Review comment:
   good idea - that makes the change even smaller...well, at least the 
important parts of the change ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6535: [FLINK-9977] [table][doc] Refine the SQL/Table built-in function docs

2018-08-13 Thread GitBox
fhueske commented on issue #6535: [FLINK-9977] [table][doc] Refine the 
SQL/Table built-in function docs
URL: https://github.com/apache/flink/pull/6535#issuecomment-412593462
 
 
   Thanks for the update @xccui.
   
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on a change in pull request #6535: [FLINK-9977] [table][doc] Refine the SQL/Table built-in function docs

2018-08-13 Thread GitBox
fhueske commented on a change in pull request #6535: [FLINK-9977] [table][doc] 
Refine the SQL/Table built-in function docs
URL: https://github.com/apache/flink/pull/6535#discussion_r209687761
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -24,7 +24,7 @@ under the License.
 
 Flink Table API & SQL provides users with a set of built-in functions for data 
transformations. This page gives a brief overview of them.
 If a function that you need is not supported yet, you can implement a user-defined function.
-Or if you think the function is general enough, please https://issues.apache.org/jira/secure/CreateIssue!default.jspa;>open a 
JIRA issue for it.
+If you think that the function is general enough, please open a Jira issue for 
it with a detailed description.
 
 Review comment:
   Oh, please keep the link :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578625#comment-16578625
 ] 

ASF GitHub Bot commented on FLINK-9977:
---

fhueske commented on a change in pull request #6535: [FLINK-9977] [table][doc] 
Refine the SQL/Table built-in function docs
URL: https://github.com/apache/flink/pull/6535#discussion_r209687761
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -24,7 +24,7 @@ under the License.
 
 Flink Table API & SQL provides users with a set of built-in functions for data 
transformations. This page gives a brief overview of them.
 If a function that you need is not supported yet, you can implement a user-defined function.
-Or if you think the function is general enough, please https://issues.apache.org/jira/secure/CreateIssue!default.jspa;>open a 
JIRA issue for it.
+If you think that the function is general enough, please open a Jira issue for 
it with a detailed description.
 
 Review comment:
   Oh, please keep the link :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
> Attachments: Java.jpg, SQL.jpg, Scala.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.
> Also, according to FLINK-10103, we should use single quotes to express 
> strings in SQL. For example, CONCAT("AA", "BB", "CC") should be replaced with 
> CONCAT('AA', 'BB', 'CC'). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6437) Move history server configuration to a separate file

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578620#comment-16578620
 ] 

ASF GitHub Bot commented on FLINK-6437:
---

zentol commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412590513
 
 
   That strategy is already implemented, but it doesn't really address 
backwards compatibility imo. I would assume that when people upgrade they'll 
end up with the default `flink-historyserver-conf.yaml` being present in `conf` 
overwriting everything in `flink-conf.yaml`.
   
   We could comment out everything in in the HS config file, always read both 
and prioritize contents in the HS config. This wouldn't affect old users (we 
could also guide them with logging messages if settings are found in 
`flink-conf.yaml`, nor should it affect new users as either a) they have to set 
a key anyway or b) a sane default handles this case.
   
   Still, I'm wondering whether there's really a benefit here. If we start 
splitting config files I'd prefer if we'd to the same for the client.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move history server configuration to a separate file
> 
>
> Key: FLINK-6437
> URL: https://issues.apache.org/jira/browse/FLINK-6437
> Project: Flink
>  Issue Type: Improvement
>  Components: History Server
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> I suggest to keep the {{flink-conf.yaml}} leaner by moving configuration of 
> the History Server to a different file.
> In general, I would propose to move configurations of separate, independent 
> and optional components to individual config files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6542: [FLINK-6437][History Server] Move history server configuration to a separate file

2018-08-13 Thread GitBox
zentol commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412590513
 
 
   That strategy is already implemented, but it doesn't really address 
backwards compatibility imo. I would assume that when people upgrade they'll 
end up with the default `flink-historyserver-conf.yaml` being present in `conf` 
overwriting everything in `flink-conf.yaml`.
   
   We could comment out everything in in the HS config file, always read both 
and prioritize contents in the HS config. This wouldn't affect old users (we 
could also guide them with logging messages if settings are found in 
`flink-conf.yaml`, nor should it affect new users as either a) they have to set 
a key anyway or b) a sane default handles this case.
   
   Still, I'm wondering whether there's really a benefit here. If we start 
splitting config files I'd prefer if we'd to the same for the client.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9853) add hex support in table api and sql

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578616#comment-16578616
 ] 

ASF GitHub Bot commented on FLINK-9853:
---

xueyumusic commented on a change in pull request #6337: [FLINK-9853] [table] 
Add HEX support 
URL: https://github.com/apache/flink/pull/6337#discussion_r209684500
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -392,6 +392,93 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "äää1234512345")
   }
 
+  @Test
+  def testHex(): Unit = {
+testAllApis(
+  100.hex(),
+  "100.hex()",
+  "HEX(100)",
+  "64")
+
+testAllApis(
+  'f2.hex(),
+  "f2.hex()",
+  "HEX(f2)",
+  "2a")
 
 Review comment:
   Yes, we should do it as well. Thanks @twalthr , I updated the code


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add hex support in table api and sql
> 
>
> Key: FLINK-9853
> URL: https://issues.apache.org/jira/browse/FLINK-9853
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: xueyu
>Priority: Major
>  Labels: pull-request-available
>
> like in mysql, HEX could take int or string arguments, For a integer argument 
> N, it returns a hexadecimal string representation of the value of N. For a 
> string argument str, it returns a hexadecimal string representation of str 
> where each byte of each character in str is converted to two hexadecimal 
> digits. 
> Syntax:
> HEX(100) = 64
> HEX('This is a test String.') = '546869732069732061207465737420537472696e672e'
> See more: [link 
> MySQL|https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_hex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xueyumusic commented on a change in pull request #6337: [FLINK-9853] [table] Add HEX support

2018-08-13 Thread GitBox
xueyumusic commented on a change in pull request #6337: [FLINK-9853] [table] 
Add HEX support 
URL: https://github.com/apache/flink/pull/6337#discussion_r209684500
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -392,6 +392,93 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "äää1234512345")
   }
 
+  @Test
+  def testHex(): Unit = {
+testAllApis(
+  100.hex(),
+  "100.hex()",
+  "HEX(100)",
+  "64")
+
+testAllApis(
+  'f2.hex(),
+  "f2.hex()",
+  "HEX(f2)",
+  "2a")
 
 Review comment:
   Yes, we should do it as well. Thanks @twalthr , I updated the code


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9878) IO worker threads BLOCKED on SSL Session Cache while CMS full gc

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578611#comment-16578611
 ] 

ASF GitHub Bot commented on FLINK-9878:
---

NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209682805
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -163,80 +163,188 @@ public static void setSSLVerifyHostname(Configuration 
sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* Configuration settings and key/trustmanager instances to set up an 
SSL client connection.
+*/
+   public static class SSLClientConfiguration {
 
 Review comment:
   good idea - that makes the change even smaller


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> IO worker threads BLOCKED on SSL Session Cache while CMS full gc
> 
>
> Key: FLINK-9878
> URL: https://issues.apache.org/jira/browse/FLINK-9878
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.1, 1.7.0
>
>
> According to https://github.com/netty/netty/issues/832, there is a JDK issue 
> during garbage collection when the SSL session cache is not limited. We 
> should allow the user to configure this and further (advanced) SSL parameters 
> for fine-tuning to fix this and similar issues. In particular, the following 
> parameters should be configurable:
> - SSL session cache size
> - SSL session timeout
> - SSL handshake timeout
> - SSL close notify flush timeout



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl options

2018-08-13 Thread GitBox
NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209682805
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -163,80 +163,188 @@ public static void setSSLVerifyHostname(Configuration 
sslConfig, SSLParameters s
}
 
/**
-* Creates the SSL Context for the client if SSL is configured.
+* Configuration settings and key/trustmanager instances to set up an 
SSL client connection.
+*/
+   public static class SSLClientConfiguration {
 
 Review comment:
   good idea - that makes the change even smaller


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl options

2018-08-13 Thread GitBox
NicoK commented on a change in pull request #6355: [FLINK-9878][network][ssl] 
add more low-level ssl options
URL: https://github.com/apache/flink/pull/6355#discussion_r209682663
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 ##
 @@ -65,6 +68,60 @@ public void testValidSslConnection() throws Exception {
 
Channel ch = NettyTestUtil.connect(serverAndClient);
 
+   SslHandler sslHandler = (SslHandler) ch.pipeline().get("ssl");
+   assertTrue("default value should not be propagated", 
sslHandler.getHandshakeTimeoutMillis() >= 0);
+   assertTrue("default value should not be propagated", 
sslHandler.getCloseNotifyTimeoutMillis() >= 0);
+
+   // should be able to send text data
+   ch.pipeline().addLast(new StringDecoder()).addLast(new 
StringEncoder());
+   assertTrue(ch.writeAndFlush("test").await().isSuccess());
+
+   NettyTestUtil.shutdown(serverAndClient);
+   }
+
+   /**
+* Verify valid (advanced) ssl configuration and connection.
+*/
+   @Test
+   public void testValidSslConnectionAdvanced() throws Exception {
 
 Review comment:
   Actually, this test verifies that the session cache size and session timeout 
are set in Netty's ssl handler and that should be enough. Whether they are 
actually being used should be tested in Netty's tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578590#comment-16578590
 ] 

ASF GitHub Bot commented on FLINK-9977:
---

xccui commented on issue #6535: [FLINK-9977] [table][doc] Refine the SQL/Table 
built-in function docs
URL: https://github.com/apache/flink/pull/6535#issuecomment-412582564
 
 
   Thanks for looking into this, @fhueske. I've just updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
> Attachments: Java.jpg, SQL.jpg, Scala.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.
> Also, according to FLINK-10103, we should use single quotes to express 
> strings in SQL. For example, CONCAT("AA", "BB", "CC") should be replaced with 
> CONCAT('AA', 'BB', 'CC'). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on issue #6535: [FLINK-9977] [table][doc] Refine the SQL/Table built-in function docs

2018-08-13 Thread GitBox
xccui commented on issue #6535: [FLINK-9977] [table][doc] Refine the SQL/Table 
built-in function docs
URL: https://github.com/apache/flink/pull/6535#issuecomment-412582564
 
 
   Thanks for looking into this, @fhueske. I've just updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10061) Fix unsupported reconfiguration in KafkaTableSink

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578548#comment-16578548
 ] 

ASF GitHub Bot commented on FLINK-10061:


tragicjun commented on issue #6495: [FLINK-10061] [table] [kafka] Fix 
unsupported reconfiguration in KafkaTableSink
URL: https://github.com/apache/flink/pull/6495#issuecomment-412575692
 
 
   @fhueske @twalthr have you decided that `Table.writeToSink()` would be 
dropped? If then, this pull request could be closed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix unsupported reconfiguration in KafkaTableSink
> -
>
> Key: FLINK-10061
> URL: https://issues.apache.org/jira/browse/FLINK-10061
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0
>
>
> When using KafkaTableSink in "table.writeToSink(), the following exception is 
> thrown:
> {quote} java.lang.UnsupportedOperationException: Reconfiguration of this sink 
> is not supported.
> {quote}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tragicjun commented on issue #6495: [FLINK-10061] [table] [kafka] Fix unsupported reconfiguration in KafkaTableSink

2018-08-13 Thread GitBox
tragicjun commented on issue #6495: [FLINK-10061] [table] [kafka] Fix 
unsupported reconfiguration in KafkaTableSink
URL: https://github.com/apache/flink/pull/6495#issuecomment-412575692
 
 
   @fhueske @twalthr have you decided that `Table.writeToSink()` would be 
dropped? If then, this pull request could be closed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9739) Regression in supported filesystems for RocksDB

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578516#comment-16578516
 ] 

ASF GitHub Bot commented on FLINK-9739:
---

sampathBhat commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412570942
 
 
   @stephanEwen I agree with your point but with the user perspective wouldn't 
it be odd to provide schema to some config options and not all. Moreover for 
selected configuration an exception is thrown if schema is provided. (Proposed 
solution)
   As an example for checkpoint dir user can give schema as either hdfs or 
file. So if the user has to provide the schema "file" for checkpoint dir and 
should not provide schema for an other configuration would only lead to 
confusions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in supported filesystems for RocksDB
> ---
>
> Key: FLINK-9739
> URL: https://issues.apache.org/jira/browse/FLINK-9739
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sampath Bhat
>Priority: Major
>  Labels: pull-request-available
>
> A user reporter on the [mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-in-Flink-1-5-0-tt21173.html]
>  has reported that the {{RocksDBStateBackend}} no longer supports GlusterFS 
> mounted volumes.
> Configuring {{file:///home/abc/share}} lead to an exception that claimed the 
> path to not be absolute.
> This was working fine in 1.4.2.
> In FLINK-6557 the {{RocksDBStateBackend}} was refactored to use java 
> {{Files}} instead of Flink {{Paths}}, potentially causing the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sampathBhat commented on issue #6452: [FLINK-9739] Regression in supported filesystems for RocksDB

2018-08-13 Thread GitBox
sampathBhat commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412570942
 
 
   @stephanEwen I agree with your point but with the user perspective wouldn't 
it be odd to provide schema to some config options and not all. Moreover for 
selected configuration an exception is thrown if schema is provided. (Proposed 
solution)
   As an example for checkpoint dir user can give schema as either hdfs or 
file. So if the user has to provide the schema "file" for checkpoint dir and 
should not provide schema for an other configuration would only lead to 
confusions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-6437) Move history server configuration to a separate file

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578511#comment-16578511
 ] 

ASF GitHub Bot commented on FLINK-6437:
---

StephanEwen commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412570277
 
 
   @yanghua You are right, this was not marked as "later" or anything in JIRA.
   There is never really a time where we don't need to worry about backwards 
compatibility, so any change that addresses this needs to take this into 
account.
   
   One way to do the backwards compatibility would be to look for the dedicated 
`flink-historyserver-conf.yaml` config file and fall back to the 
`flink-conf.yaml` file if the former does not exist.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move history server configuration to a separate file
> 
>
> Key: FLINK-6437
> URL: https://issues.apache.org/jira/browse/FLINK-6437
> Project: Flink
>  Issue Type: Improvement
>  Components: History Server
>Affects Versions: 1.3.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> I suggest to keep the {{flink-conf.yaml}} leaner by moving configuration of 
> the History Server to a different file.
> In general, I would propose to move configurations of separate, independent 
> and optional components to individual config files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6542: [FLINK-6437][History Server] Move history server configuration to a separate file

2018-08-13 Thread GitBox
StephanEwen commented on issue #6542: [FLINK-6437][History Server] Move history 
server configuration to a separate file
URL: https://github.com/apache/flink/pull/6542#issuecomment-412570277
 
 
   @yanghua You are right, this was not marked as "later" or anything in JIRA.
   There is never really a time where we don't need to worry about backwards 
compatibility, so any change that addresses this needs to take this into 
account.
   
   One way to do the backwards compatibility would be to look for the dedicated 
`flink-historyserver-conf.yaml` config file and fall back to the 
`flink-conf.yaml` file if the former does not exist.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9739) Regression in supported filesystems for RocksDB

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578496#comment-16578496
 ] 

ASF GitHub Bot commented on FLINK-9739:
---

StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412566310
 
 
   @sampathBhat There are two very different config parameter types: URIs 
across file systems (like where checkpoints go to) and local directories on the 
local file system (like temp dirs). We cannot have one way to handle both, they 
have conflicting requirements.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in supported filesystems for RocksDB
> ---
>
> Key: FLINK-9739
> URL: https://issues.apache.org/jira/browse/FLINK-9739
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sampath Bhat
>Priority: Major
>  Labels: pull-request-available
>
> A user reporter on the [mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-in-Flink-1-5-0-tt21173.html]
>  has reported that the {{RocksDBStateBackend}} no longer supports GlusterFS 
> mounted volumes.
> Configuring {{file:///home/abc/share}} lead to an exception that claimed the 
> path to not be absolute.
> This was working fine in 1.4.2.
> In FLINK-6557 the {{RocksDBStateBackend}} was refactored to use java 
> {{Files}} instead of Flink {{Paths}}, potentially causing the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported filesystems for RocksDB

2018-08-13 Thread GitBox
StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412566310
 
 
   @sampathBhat There are two very different config parameter types: URIs 
across file systems (like where checkpoints go to) and local directories on the 
local file system (like temp dirs). We cannot have one way to handle both, they 
have conflicting requirements.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9739) Regression in supported filesystems for RocksDB

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578483#comment-16578483
 ] 

ASF GitHub Bot commented on FLINK-9739:
---

sampathBhat commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412565010
 
 
   If we decide to provide file paths without the schema then the same 
provision must be provided for all other configuration options that involves 
file path. To maintain uniformity. Also the same must be documented.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in supported filesystems for RocksDB
> ---
>
> Key: FLINK-9739
> URL: https://issues.apache.org/jira/browse/FLINK-9739
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sampath Bhat
>Priority: Major
>  Labels: pull-request-available
>
> A user reporter on the [mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-in-Flink-1-5-0-tt21173.html]
>  has reported that the {{RocksDBStateBackend}} no longer supports GlusterFS 
> mounted volumes.
> Configuring {{file:///home/abc/share}} lead to an exception that claimed the 
> path to not be absolute.
> This was working fine in 1.4.2.
> In FLINK-6557 the {{RocksDBStateBackend}} was refactored to use java 
> {{Files}} instead of Flink {{Paths}}, potentially causing the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9739) Regression in supported filesystems for RocksDB

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578484#comment-16578484
 ] 

ASF GitHub Bot commented on FLINK-9739:
---

StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412565039
 
 
   Given this implication on Yarn / Mesos, I am +1 to remove the absolute path 
requirement (my mistake to introduce that in the first place, I was also 
motivated by better predictability here). Would even merge that back to 1.5 and 
1.6


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in supported filesystems for RocksDB
> ---
>
> Key: FLINK-9739
> URL: https://issues.apache.org/jira/browse/FLINK-9739
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sampath Bhat
>Priority: Major
>  Labels: pull-request-available
>
> A user reporter on the [mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-in-Flink-1-5-0-tt21173.html]
>  has reported that the {{RocksDBStateBackend}} no longer supports GlusterFS 
> mounted volumes.
> Configuring {{file:///home/abc/share}} lead to an exception that claimed the 
> path to not be absolute.
> This was working fine in 1.4.2.
> In FLINK-6557 the {{RocksDBStateBackend}} was refactored to use java 
> {{Files}} instead of Flink {{Paths}}, potentially causing the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sampathBhat commented on issue #6452: [FLINK-9739] Regression in supported filesystems for RocksDB

2018-08-13 Thread GitBox
sampathBhat commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412565010
 
 
   If we decide to provide file paths without the schema then the same 
provision must be provided for all other configuration options that involves 
file path. To maintain uniformity. Also the same must be documented.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported filesystems for RocksDB

2018-08-13 Thread GitBox
StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412565039
 
 
   Given this implication on Yarn / Mesos, I am +1 to remove the absolute path 
requirement (my mistake to introduce that in the first place, I was also 
motivated by better predictability here). Would even merge that back to 1.5 and 
1.6


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9739) Regression in supported filesystems for RocksDB

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578480#comment-16578480
 ] 

ASF GitHub Bot commented on FLINK-9739:
---

StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412564568
 
 
   My understanding is that relative paths may make sense in setups like YARN 
or Mesos, where the exact temp directories filled in dynamically by the 
Yarn/Mesos node upon starting the TaskManager.
   
   It may even be that every TM has different temp dirs, because they look like 
`/data/yarn/node-id/executions-attempt-uuid/local/temp/` or so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in supported filesystems for RocksDB
> ---
>
> Key: FLINK-9739
> URL: https://issues.apache.org/jira/browse/FLINK-9739
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sampath Bhat
>Priority: Major
>  Labels: pull-request-available
>
> A user reporter on the [mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-in-Flink-1-5-0-tt21173.html]
>  has reported that the {{RocksDBStateBackend}} no longer supports GlusterFS 
> mounted volumes.
> Configuring {{file:///home/abc/share}} lead to an exception that claimed the 
> path to not be absolute.
> This was working fine in 1.4.2.
> In FLINK-6557 the {{RocksDBStateBackend}} was refactored to use java 
> {{Files}} instead of Flink {{Paths}}, potentially causing the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported filesystems for RocksDB

2018-08-13 Thread GitBox
StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412564568
 
 
   My understanding is that relative paths may make sense in setups like YARN 
or Mesos, where the exact temp directories filled in dynamically by the 
Yarn/Mesos node upon starting the TaskManager.
   
   It may even be that every TM has different temp dirs, because they look like 
`/data/yarn/node-id/executions-attempt-uuid/local/temp/` or so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9222) Add a Gradle Quickstart

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578467#comment-16578467
 ] 

ASF GitHub Bot commented on FLINK-9222:
---

twalthr commented on a change in pull request #105: [FLINK-9222] add gradle 
quickstart script
URL: https://github.com/apache/flink-web/pull/105#discussion_r209653708
 
 

 ##
 File path: q/gradle-quickstart.sh
 ##
 @@ -0,0 +1,226 @@
+#!/bin/bash
+
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+declare -r TRUE=0
+declare -r FALSE=1
+
+# takes a string and returns true if it seems to represent "yes"
+function isYes() {
+  local x=$1
+  [ $x = "y" -o $x = "Y" -o $x = "yes" ] && echo $TRUE; return
+  echo $FALSE
+}
+
+function mkDir() {
+  local x=$1
+  echo ${x// /-} | tr '[:upper:]' '[:lower:]'
+}
+
+function mkPackage() {
+  local x=$1
+  echo ${x//./\/}
+}
+
+defaultProjectName="quickstart"
+defaultOrganization="org.myorg.quickstart"
+defaultVersion="0.1-SNAPSHOT"
+defaultScalaBinaryVersion="2.11"
+defaultFlinkVersion="1.6-SNAPSHOT"
+
+echo "This script creates a Flink project using Java and Gradle."
+
+while [ $TRUE ]; do
+
+  echo ""
+  read -p "Project name ($defaultProjectName): " projectName
+  projectName=${projectName:-$defaultProjectName}
+  read -p "Organization ($defaultOrganization): " organization
+  organization=${organization:-$defaultOrganization}
+  read -p "Version ($defaultVersion): " version
+  version=${version:-$defaultVersion}
+  read -p "Scala version ($defaultScalaBinaryVersion): " scalaBinaryVersion
+  scalaBinaryVersion=${scalaBinaryVersion:-$defaultScalaBinaryVersion}
+  read -p "Flink version ($defaultFlinkVersion): " flinkVersion
+  flinkVersion=${flinkVersion:-$defaultFlinkVersion}
+
+  echo ""
+  echo "---"
+  echo "Project Name: ${projectName}"
+  echo "Organization: ${organization}"
+  echo "Version: ${version}"
+  echo "Scala binary version: ${scalaBinaryVersion}"
+  echo "Flink version: ${flinkVersion}"
+  echo "---"
+  read -p "Create Project? (Y/n): " createProject
+  createProject=${createProject:-y}
+
+  [ "$(isYes "${createProject}")" = "$TRUE" ] && break
+
+done
+
+directoryName=$(mkDir "${projectName}")
+
+echo "Creating Flink project under ${directoryName}"
+
+mkdir -p ${directoryName}
+cd ${directoryName}
+
+# Create the README file
+
+cat > README < settings.gradle < build.gradle  Explicitly define the // libraries we want to be included in the 
"flinkShadowJar" configuration!
+configurations {
+flinkShadowJar // dependencies which go into the shadowJar
+
+// always exclude these (also from transitive dependencies) since they are 
provided by Flink
+flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
+flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
+flinkShadowJar.exclude group: 'org.slf4j'
+flinkShadowJar.exclude group: 'log4j'
+}
+
+// declare the dependencies for your production and test code
+dependencies {
+compile "org.apache.flink:flink-java:\${flinkVersion}"
+compile 
"org.apache.flink:flink-streaming-java_\${scalaBinaryVersion}:\${flinkVersion}"
+
+// Add connector dependencies here.
 
 Review comment:
   Not only `connector`. In general I would make this section more prominent. 
Maybe using
   ```
   // --
   // Add dependencies here that should NOT be part of the 
   // shadow jar and are provided in the lib folder of Flink
   // --
   ...
   // 

[jira] [Commented] (FLINK-9222) Add a Gradle Quickstart

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578466#comment-16578466
 ] 

ASF GitHub Bot commented on FLINK-9222:
---

twalthr commented on a change in pull request #105: [FLINK-9222] add gradle 
quickstart script
URL: https://github.com/apache/flink-web/pull/105#discussion_r209652918
 
 

 ##
 File path: q/gradle-quickstart.sh
 ##
 @@ -0,0 +1,226 @@
+#!/bin/bash
+
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+declare -r TRUE=0
+declare -r FALSE=1
+
+# takes a string and returns true if it seems to represent "yes"
+function isYes() {
+  local x=$1
+  [ $x = "y" -o $x = "Y" -o $x = "yes" ] && echo $TRUE; return
+  echo $FALSE
+}
+
+function mkDir() {
+  local x=$1
+  echo ${x// /-} | tr '[:upper:]' '[:lower:]'
+}
+
+function mkPackage() {
+  local x=$1
+  echo ${x//./\/}
+}
+
+defaultProjectName="quickstart"
+defaultOrganization="org.myorg.quickstart"
+defaultVersion="0.1-SNAPSHOT"
+defaultScalaBinaryVersion="2.11"
+defaultFlinkVersion="1.6-SNAPSHOT"
+
+echo "This script creates a Flink project using Java and Gradle."
+
+while [ $TRUE ]; do
+
+  echo ""
+  read -p "Project name ($defaultProjectName): " projectName
+  projectName=${projectName:-$defaultProjectName}
+  read -p "Organization ($defaultOrganization): " organization
+  organization=${organization:-$defaultOrganization}
+  read -p "Version ($defaultVersion): " version
+  version=${version:-$defaultVersion}
+  read -p "Scala version ($defaultScalaBinaryVersion): " scalaBinaryVersion
+  scalaBinaryVersion=${scalaBinaryVersion:-$defaultScalaBinaryVersion}
+  read -p "Flink version ($defaultFlinkVersion): " flinkVersion
+  flinkVersion=${flinkVersion:-$defaultFlinkVersion}
+
+  echo ""
+  echo "---"
+  echo "Project Name: ${projectName}"
+  echo "Organization: ${organization}"
+  echo "Version: ${version}"
+  echo "Scala binary version: ${scalaBinaryVersion}"
+  echo "Flink version: ${flinkVersion}"
+  echo "---"
+  read -p "Create Project? (Y/n): " createProject
+  createProject=${createProject:-y}
+
+  [ "$(isYes "${createProject}")" = "$TRUE" ] && break
+
+done
+
+directoryName=$(mkDir "${projectName}")
+
+echo "Creating Flink project under ${directoryName}"
+
+mkdir -p ${directoryName}
+cd ${directoryName}
+
+# Create the README file
+
+cat > README < Add a Gradle Quickstart
> ---
>
> Key: FLINK-9222
> URL: https://issues.apache.org/jira/browse/FLINK-9222
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website, Quickstarts
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
>
> Having a proper project template helps a lot in getting dependencies right. 
> For example, setting the core dependencies to "provided", the connector / 
> library dependencies to "compile", etc.
> The Maven quickstarts are in good shape by now, but there is none for Gradle 
> and Gradle users to get this wrong quite often.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9739) Regression in supported filesystems for RocksDB

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578456#comment-16578456
 ] 

ASF GitHub Bot commented on FLINK-9739:
---

zentol commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412561511
 
 
   When would a relative path make sense, and wouldn't this always result in 
effectively random behavior?
   
   I would be fine with dropping support for schemes; if we either reject or 
ignore schemes anyway we may as well be explicit about it up-front.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in supported filesystems for RocksDB
> ---
>
> Key: FLINK-9739
> URL: https://issues.apache.org/jira/browse/FLINK-9739
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sampath Bhat
>Priority: Major
>  Labels: pull-request-available
>
> A user reporter on the [mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-in-Flink-1-5-0-tt21173.html]
>  has reported that the {{RocksDBStateBackend}} no longer supports GlusterFS 
> mounted volumes.
> Configuring {{file:///home/abc/share}} lead to an exception that claimed the 
> path to not be absolute.
> This was working fine in 1.4.2.
> In FLINK-6557 the {{RocksDBStateBackend}} was refactored to use java 
> {{Files}} instead of Flink {{Paths}}, potentially causing the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6452: [FLINK-9739] Regression in supported filesystems for RocksDB

2018-08-13 Thread GitBox
zentol commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412561511
 
 
   When would a relative path make sense, and wouldn't this always result in 
effectively random behavior?
   
   I would be fine with dropping support for schemes; if we either reject or 
ignore schemes anyway we may as well be explicit about it up-front.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10133) finished job's jobgraph never been cleaned up in zookeeper for standalone clusters (HA mode with multiple masters)

2018-08-13 Thread Xiangyu Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578452#comment-16578452
 ] 

Xiangyu Zhu commented on FLINK-10133:
-

[~Wosinsan] Sure, but I have no access to my servers right now. I will post the 
logs tomorrow.

> finished job's jobgraph never been cleaned up in zookeeper for standalone 
> clusters (HA mode with multiple masters)
> --
>
> Key: FLINK-10133
> URL: https://issues.apache.org/jira/browse/FLINK-10133
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0, 1.5.2, 1.6.0
>Reporter: Xiangyu Zhu
>Priority: Major
>
> Hi,
> We have 3 servers in our test environment, noted as node1-3. Setup is as 
> following:
>  * hadoop hdfs: node1 as namenode, node2,3 as datanode
>  * zookeeper: node1-3 as a quorum (but also tried node1 alone)
>  * flink: node1,2 as masters, node2,3 as slaves
> As my understanding when a job finished the corresponding job's blob data is 
> expected to be deleted from hdfs path and node under zookeeper's path `/\{zk 
> path root}/\{cluster-id}/jobgraphs/\{job id}` should be deleted after that. 
> However we observe that whenever we submitted a job and it finished (via 
> `bin/flink run WordCount.jar`), the blob data is gone whereas job id node 
> under zookeeper is still there, with a uuid style lock node inside it. From 
> the debug node in zookeeper we observed something like "cannot be deleted 
> because non empty". Because of this, as long as a job is finished and the 
> jobgraph node persists, if restart the clusters or kill one manager (to test 
> HA mode), it tries to recover a finished job and couldn't find blob data 
> under hdfs, and the whole cluster is down.
> If we use only node1 as master and node2,3 as slaves, the jobgraphs node can 
> be deleted successfully. If the jobgraphs is clean, killing one job manager 
> makes another stand-by JM raised as leader, so it is only this jobgraphs 
> issue preventing HA from working.
> I'm not sure if there's something wrong with our configs because this happens 
> every time for finished job (we only tested with wordcount.jar though). I'm 
> aware of FLINK-10011 and FLINK-10029, but unlike FLINK-10011 this happens 
> every time, rendering HA mode un-useable for us.
> Any idea what might cause this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9739) Regression in supported filesystems for RocksDB

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578448#comment-16578448
 ] 

ASF GitHub Bot commented on FLINK-9739:
---

StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412559130
 
 
   Do we want to restore the previous behavior? I mean, RocksDB paths must be 
local file paths. Having URIs with file system schemes in there seems like a 
mistake of the past. That is also the reason why splitting at `:` causes 
trouble, because URIs and directory lists don't go together well.
   
   We could simply drop the old behavior and help smoothen the transition.
   
 - Drop the check for absolute path and allow relative paths
 - If "file" occurs as a segment in the path list, abort with the proper 
exception that mentions that URIs are not supported any more and the user 
should switch to file paths only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in supported filesystems for RocksDB
> ---
>
> Key: FLINK-9739
> URL: https://issues.apache.org/jira/browse/FLINK-9739
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sampath Bhat
>Priority: Major
>  Labels: pull-request-available
>
> A user reporter on the [mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-in-Flink-1-5-0-tt21173.html]
>  has reported that the {{RocksDBStateBackend}} no longer supports GlusterFS 
> mounted volumes.
> Configuring {{file:///home/abc/share}} lead to an exception that claimed the 
> path to not be absolute.
> This was working fine in 1.4.2.
> In FLINK-6557 the {{RocksDBStateBackend}} was refactored to use java 
> {{Files}} instead of Flink {{Paths}}, potentially causing the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported filesystems for RocksDB

2018-08-13 Thread GitBox
StephanEwen commented on issue #6452: [FLINK-9739] Regression in supported 
filesystems for RocksDB
URL: https://github.com/apache/flink/pull/6452#issuecomment-412559130
 
 
   Do we want to restore the previous behavior? I mean, RocksDB paths must be 
local file paths. Having URIs with file system schemes in there seems like a 
mistake of the past. That is also the reason why splitting at `:` causes 
trouble, because URIs and directory lists don't go together well.
   
   We could simply drop the old behavior and help smoothen the transition.
   
 - Drop the check for absolute path and allow relative paths
 - If "file" occurs as a segment in the path list, abort with the proper 
exception that mentions that URIs are not supported any more and the user 
should switch to file paths only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9400) change import statement for flink-scala

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578401#comment-16578401
 ] 

ASF GitHub Bot commented on FLINK-9400:
---

zentol closed pull request #6042: [FLINK-9400] normalize import statement style 
for flink-scala
URL: https://github.com/apache/flink/pull/6042
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
index aa6b47b9649..4510e52e6fd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
@@ -23,9 +23,9 @@ import org.apache.flink.annotation.{Internal, Public}
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.functions.{CoGroupFunction, Partitioner, 
RichCoGroupFunction}
 import org.apache.flink.api.common.operators.{Keys, Order}
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
-import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.util.Collector
 
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 71037c3ae1d..6ba6455c17f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,7 +23,8 @@ import 
org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
-import org.apache.flink.api.common.operators.{Keys, Order, ResourceSpec}
+import org.apache.flink.api.common.operators.{Order, ResourceSpec}
+import org.apache.flink.api.common.operators.Keys.{ExpressionKeys, 
SelectorFunctionKeys}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
@@ -32,7 +33,6 @@ import org.apache.flink.api.java.Utils.CountHelper
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
 import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
-import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
@@ -837,7 +837,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 }
 wrap(new DistinctOperator[T](
   javaSet,
-  new Keys.SelectorFunctionKeys[T, K](
+  new SelectorFunctionKeys[T, K](
 keyExtractor, javaSet.getType, implicitly[TypeInformation[K]]),
 getCallLocationName()))
   }
@@ -865,7 +865,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def distinct(fields: Int*): DataSet[T] = {
 wrap(new DistinctOperator[T](
   javaSet,
-  new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType),
+  new ExpressionKeys[T](fields.toArray, javaSet.getType),
   getCallLocationName()))
   }
 
@@ -888,7 +888,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def distinct(firstField: String, otherFields: String*): DataSet[T] = {
 wrap(new DistinctOperator[T](
   javaSet,
-  new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, 
javaSet.getType),
+  new ExpressionKeys[T](firstField +: otherFields.toArray, 
javaSet.getType),
   getCallLocationName()))
   }
 
@@ -911,7 +911,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def getKey(in: T) = cleanFun(in)
 }
 new GroupedDataSet[T](this,
-  new Keys.SelectorFunctionKeys[T, K](keyExtractor, javaSet.getType, 
keyType))
+  new SelectorFunctionKeys[T, K](keyExtractor, javaSet.getType, keyType))
   }
 
   /**
@@ -926,7 +926,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def groupBy(fields: Int*): GroupedDataSet[T] = {
 new GroupedDataSet[T](
   this,
-  new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType))
+  new ExpressionKeys[T](fields.toArray, javaSet.getType))
   }
 
   /**
@@ -940,11 +940,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def groupBy(firstField: String, 

[jira] [Commented] (FLINK-10133) finished job's jobgraph never been cleaned up in zookeeper for standalone clusters (HA mode with multiple masters)

2018-08-13 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578403#comment-16578403
 ] 

Dominik Wosiński commented on FLINK-10133:
--

I have faced this issue when I was trying to deploy multiple JobManagers. I was 
not able to solve it. 
But it's weird that it still affects 1.5.2 and 1.6.0 since my idea was that 
jobs are failing due to the removal of blobs being independent of removal of 
the job graph and this was fixed in FLINK-9575. Could you post some logs here?

> finished job's jobgraph never been cleaned up in zookeeper for standalone 
> clusters (HA mode with multiple masters)
> --
>
> Key: FLINK-10133
> URL: https://issues.apache.org/jira/browse/FLINK-10133
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0, 1.5.2, 1.6.0
>Reporter: Xiangyu Zhu
>Priority: Major
>
> Hi,
> We have 3 servers in our test environment, noted as node1-3. Setup is as 
> following:
>  * hadoop hdfs: node1 as namenode, node2,3 as datanode
>  * zookeeper: node1-3 as a quorum (but also tried node1 alone)
>  * flink: node1,2 as masters, node2,3 as slaves
> As my understanding when a job finished the corresponding job's blob data is 
> expected to be deleted from hdfs path and node under zookeeper's path `/\{zk 
> path root}/\{cluster-id}/jobgraphs/\{job id}` should be deleted after that. 
> However we observe that whenever we submitted a job and it finished (via 
> `bin/flink run WordCount.jar`), the blob data is gone whereas job id node 
> under zookeeper is still there, with a uuid style lock node inside it. From 
> the debug node in zookeeper we observed something like "cannot be deleted 
> because non empty". Because of this, as long as a job is finished and the 
> jobgraph node persists, if restart the clusters or kill one manager (to test 
> HA mode), it tries to recover a finished job and couldn't find blob data 
> under hdfs, and the whole cluster is down.
> If we use only node1 as master and node2,3 as slaves, the jobgraphs node can 
> be deleted successfully. If the jobgraphs is clean, killing one job manager 
> makes another stand-by JM raised as leader, so it is only this jobgraphs 
> issue preventing HA from working.
> I'm not sure if there's something wrong with our configs because this happens 
> every time for finished job (we only tested with wordcount.jar though). I'm 
> aware of FLINK-10011 and FLINK-10029, but unlike FLINK-10011 this happens 
> every time, rendering HA mode un-useable for us.
> Any idea what might cause this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9400) change import statement for flink-scala

2018-08-13 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9400.
---
   Resolution: Won't Fix
Fix Version/s: (was: 1.7.0)

> change import statement for flink-scala
> ---
>
> Key: FLINK-9400
> URL: https://issues.apache.org/jira/browse/FLINK-9400
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.4.1
>Reporter: thinkerou
>Priority: Trivial
>  Labels: easyfix, pull-request-available
>
> At `flink-scala` project have the follow import statement:
>  
> ```scala
> import org.apache.flink.api.common.operators.Keys
> import Keys.ExpressionKeys
> ```
> So I want to commit one pull request to fix it.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9400) change import statement for flink-scala

2018-08-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9400:
--
Labels: easyfix pull-request-available  (was: easyfix)

> change import statement for flink-scala
> ---
>
> Key: FLINK-9400
> URL: https://issues.apache.org/jira/browse/FLINK-9400
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.4.1
>Reporter: thinkerou
>Priority: Trivial
>  Labels: easyfix, pull-request-available
>
> At `flink-scala` project have the follow import statement:
>  
> ```scala
> import org.apache.flink.api.common.operators.Keys
> import Keys.ExpressionKeys
> ```
> So I want to commit one pull request to fix it.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol closed pull request #6042: [FLINK-9400] normalize import statement style for flink-scala

2018-08-13 Thread GitBox
zentol closed pull request #6042: [FLINK-9400] normalize import statement style 
for flink-scala
URL: https://github.com/apache/flink/pull/6042
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
index aa6b47b9649..4510e52e6fd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/CoGroupDataSet.scala
@@ -23,9 +23,9 @@ import org.apache.flink.annotation.{Internal, Public}
 import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.functions.{CoGroupFunction, Partitioner, 
RichCoGroupFunction}
 import org.apache.flink.api.common.operators.{Keys, Order}
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
-import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.util.Collector
 
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 71037c3ae1d..6ba6455c17f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,7 +23,8 @@ import 
org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
-import org.apache.flink.api.common.operators.{Keys, Order, ResourceSpec}
+import org.apache.flink.api.common.operators.{Order, ResourceSpec}
+import org.apache.flink.api.common.operators.Keys.{ExpressionKeys, 
SelectorFunctionKeys}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
@@ -32,7 +33,6 @@ import org.apache.flink.api.java.Utils.CountHelper
 import org.apache.flink.api.java.aggregation.Aggregations
 import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
 import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
-import Keys.ExpressionKeys
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
@@ -837,7 +837,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 }
 wrap(new DistinctOperator[T](
   javaSet,
-  new Keys.SelectorFunctionKeys[T, K](
+  new SelectorFunctionKeys[T, K](
 keyExtractor, javaSet.getType, implicitly[TypeInformation[K]]),
 getCallLocationName()))
   }
@@ -865,7 +865,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def distinct(fields: Int*): DataSet[T] = {
 wrap(new DistinctOperator[T](
   javaSet,
-  new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType),
+  new ExpressionKeys[T](fields.toArray, javaSet.getType),
   getCallLocationName()))
   }
 
@@ -888,7 +888,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def distinct(firstField: String, otherFields: String*): DataSet[T] = {
 wrap(new DistinctOperator[T](
   javaSet,
-  new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, 
javaSet.getType),
+  new ExpressionKeys[T](firstField +: otherFields.toArray, 
javaSet.getType),
   getCallLocationName()))
   }
 
@@ -911,7 +911,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def getKey(in: T) = cleanFun(in)
 }
 new GroupedDataSet[T](this,
-  new Keys.SelectorFunctionKeys[T, K](keyExtractor, javaSet.getType, 
keyType))
+  new SelectorFunctionKeys[T, K](keyExtractor, javaSet.getType, keyType))
   }
 
   /**
@@ -926,7 +926,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def groupBy(fields: Int*): GroupedDataSet[T] = {
 new GroupedDataSet[T](
   this,
-  new Keys.ExpressionKeys[T](fields.toArray, javaSet.getType))
+  new ExpressionKeys[T](fields.toArray, javaSet.getType))
   }
 
   /**
@@ -940,11 +940,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T] = {
 new GroupedDataSet[T](
   this,
-  new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, 
javaSet.getType))
+  new ExpressionKeys[T](firstField +: otherFields.toArray, 

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578393#comment-16578393
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209615214
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
+ */
+public class CsvRowSchemaConverter {
+
+   /**
+* Types that can be converted to ColumnType.NUMBER.
+*/
+   private static final List> NUMBER_TYPES =
+   Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT,
 
 Review comment:
   Use a `HashSet` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578397#comment-16578397
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209619878
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
+ */
+public class CsvRowSchemaConverter {
+
+   /**
+* Types that can be converted to ColumnType.NUMBER.
+*/
+   private static final List> NUMBER_TYPES =
+   Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT,
+   Types.BIG_DEC, Types.BIG_INT);
+
+   /**
+* Types that can be converted to ColumnType.STRING.
+*/
+   private static final List> STRING_TYPES =
+   Arrays.asList(Types.STRING, Types.SQL_DATE, Types.SQL_TIME, 
Types.SQL_TIMESTAMP);
+
+   /**
+* Types that can be converted to ColumnType.BOOLEAN.
+*/
+   private static final List> BOOLEAN_TYPES =
+   Collections.singletonList(Types.BOOLEAN);
+
+   /**
+* Convert {@link RowTypeInfo} to {@link CsvSchema}.
+* @param rowType
+* @return {@link CsvSchema}
+*/
+   public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) {
+   Builder builder = new CsvSchema.Builder();
+   String[] fields = rowType.getFieldNames();
+   TypeInformation[] infos = rowType.getFieldTypes();
+   for (int i = 0; i < rowType.getArity(); i++) {
+   builder.addColumn(new Column(i, fields[i], 
convertType(infos[i])));
 
 Review comment:
   Is the converter considering the global properties for array separation? I 
guess yes, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look 

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578389#comment-16578389
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209612567
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java
 ##
 @@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.CsvValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.factories.DeserializationSchemaFactory;
+import org.apache.flink.table.factories.SerializationSchemaFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table format for providing configured instances of CSV-to-row {@link 
SerializationSchema}
+ * and {@link DeserializationSchema}.
+ */
+public class CsvRowFormatFactory implements SerializationSchemaFactory,
+   DeserializationSchemaFactory  {
+
+   @Override
+   public Map requiredContext() {
+   final Map context = new HashMap<>();
+   context.put(FormatDescriptorValidator.FORMAT_TYPE(), 
CsvValidator.FORMAT_TYPE_VALUE());
+   
context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1");
+   return context;
+   }
+
+   @Override
+   public boolean supportsSchemaDerivation() {
+   return false;
 
 Review comment:
   Could we add support for schema derivation as well? It gets more and more 
complicated in the future if each format supports different features. We should 
add all features in one PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578383#comment-16578383
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209605365
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578400#comment-16578400
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209625213
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
 
 Review comment:
   The method calls here are expensive for every record. Can we initialize the 
mapper before and only call `readValue` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we 

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578398#comment-16578398
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209620482
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
+ */
+public class CsvRowSchemaConverter {
+
+   /**
+* Types that can be converted to ColumnType.NUMBER.
+*/
+   private static final List> NUMBER_TYPES =
+   Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT,
+   Types.BIG_DEC, Types.BIG_INT);
+
+   /**
+* Types that can be converted to ColumnType.STRING.
+*/
+   private static final List> STRING_TYPES =
+   Arrays.asList(Types.STRING, Types.SQL_DATE, Types.SQL_TIME, 
Types.SQL_TIMESTAMP);
+
+   /**
+* Types that can be converted to ColumnType.BOOLEAN.
+*/
+   private static final List> BOOLEAN_TYPES =
+   Collections.singletonList(Types.BOOLEAN);
+
+   /**
+* Convert {@link RowTypeInfo} to {@link CsvSchema}.
+* @param rowType
+* @return {@link CsvSchema}
+*/
+   public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) {
+   Builder builder = new CsvSchema.Builder();
+   String[] fields = rowType.getFieldNames();
+   TypeInformation[] infos = rowType.getFieldTypes();
+   for (int i = 0; i < rowType.getArity(); i++) {
+   builder.addColumn(new Column(i, fields[i], 
convertType(infos[i])));
+   }
+   return builder.build();
+   }
+
+   /**
+* Convert {@link TypeInformation} to {@link CsvSchema.ColumnType}
+* based on their catogories.
+* @param info
+* @return {@link CsvSchema.ColumnType}
+*/
+   private static CsvSchema.ColumnType convertType(TypeInformation 
info) {
+   if (STRING_TYPES.contains(info)) {
+   return CsvSchema.ColumnType.STRING;
+   } else if (NUMBER_TYPES.contains(info)) {
+   return CsvSchema.ColumnType.NUMBER;
+   } else if (BOOLEAN_TYPES.contains(info)) {
+   return CsvSchema.ColumnType.BOOLEAN;
+   } else if (info instanceof ObjectArrayTypeInfo
+   || info instanceof BasicArrayTypeInfo
+   || info instanceof RowTypeInfo) {
+   return CsvSchema.ColumnType.ARRAY;
+   } else if (info instanceof PrimitiveArrayTypeInfo &&
+   ((PrimitiveArrayTypeInfo) info).getComponentType() == 
Types.BYTE) {
+   return CsvSchema.ColumnType.STRING;
+   } else {
+   throw new RuntimeException("Unable to support " 
+ info.toString()
 
 Review 

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578399#comment-16578399
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209632669
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a CSV 
bytes.
+ *
+ * Serializes the input row into a {@link ObjectNode} and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
CsvRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class CsvRowSerializationSchema implements SerializationSchema {
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Reusable object node. */
+   private ObjectNode root;
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+   /**
+* Create a {@link CsvRowSerializationSchema} with given {@link 
TypeInformation}.
+* @param rowTypeInfo type information used to create schem.
+*/
+   CsvRowSerializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public byte[] serialize(Row row) {
+   if (root == null) {
+   root = csvMapper.createObjectNode();
+   }
+   try {
+   convertRow(root, row, (RowTypeInfo) rowTypeInfo);
+   return 
csvMapper.writer(csvSchema).writeValueAsBytes(root);
+   } catch (JsonProcessingException e) {
+   throw new RuntimeException("Could not serialize row '" 
+ row + "'. " +
+   "Make sure that the schema matches the input.", 
e);
+   }
+   }
+
+   private void convertRow(ObjectNode reuse, Row row, RowTypeInfo 
rowTypeInfo) {
+   if (reuse == null) {
+   reuse = csvMapper.createObjectNode();
+   }
+   if (row.getArity() != rowTypeInfo.getFieldNames().length) {
+   throw new IllegalStateException(String.format(
+   "Number of elements in the row '%s' is 
different from number of field names: %d",
+

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578391#comment-16578391
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209610788
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578379#comment-16578379
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209602000
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
 
 Review comment:
   Make constructor public?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578388#comment-16578388
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209607038
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
 
 Review comment:
   I'm a big fan of immutability. I think all members of this class that are 
not runtime specific should be added to the constructor with `final` modifier.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578387#comment-16578387
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209610191
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
 
 Review comment:
   Make this `static final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578382#comment-16578382
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209602349
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
 
 Review comment:
   Remove double empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578396#comment-16578396
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209630166
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a CSV 
bytes.
+ *
+ * Serializes the input row into a {@link ObjectNode} and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
CsvRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class CsvRowSerializationSchema implements SerializationSchema {
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Reusable object node. */
+   private ObjectNode root;
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+   /**
+* Create a {@link CsvRowSerializationSchema} with given {@link 
TypeInformation}.
+* @param rowTypeInfo type information used to create schem.
+*/
+   CsvRowSerializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public byte[] serialize(Row row) {
+   if (root == null) {
+   root = csvMapper.createObjectNode();
+   }
+   try {
+   convertRow(root, row, (RowTypeInfo) rowTypeInfo);
+   return 
csvMapper.writer(csvSchema).writeValueAsBytes(root);
+   } catch (JsonProcessingException e) {
+   throw new RuntimeException("Could not serialize row '" 
+ row + "'. " +
+   "Make sure that the schema matches the input.", 
e);
+   }
+   }
+
+   private void convertRow(ObjectNode reuse, Row row, RowTypeInfo 
rowTypeInfo) {
+   if (reuse == null) {
+   reuse = csvMapper.createObjectNode();
+   }
+   if (row.getArity() != rowTypeInfo.getFieldNames().length) {
+   throw new IllegalStateException(String.format(
+   "Number of elements in the row '%s' is 
different from number of field names: %d",
+

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578380#comment-16578380
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209598354
 
 

 ##
 File path: flink-formats/flink-csv/pom.xml
 ##
 @@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-csv
+   flink-csv
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
+   flink-table_2.11
+   ${project.version}
+   provided
+   
+   true
+   
+
+   
+   
+   com.fasterxml.jackson.dataformat
 
 Review comment:
   @zentol I guess we need to add this to flink-shaded similar as for YAML 
right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578392#comment-16578392
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209616703
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
 
 Review comment:
   We should also document internals of Jackson. In particular when trimmed 
(leading/trailing white space) and the special meaning of literals "null", 
"true" and "false".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >