[jira] [Created] (FLINK-10239) Register eventtime timer only once in eventtrigger

2018-08-28 Thread buptljy (JIRA)
buptljy created FLINK-10239:
---

 Summary: Register eventtime timer only once in eventtrigger
 Key: FLINK-10239
 URL: https://issues.apache.org/jira/browse/FLINK-10239
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.5.3
Reporter: buptljy


I find that we call ctx.registerEventTimeTimer(window.maxTimestamp()) every 
time when an element is received in the window. Even though it doesn't affect 
the result because it uses a Set, but I think it can still be an improvement if 
we call it only once.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-25 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy closed FLINK-10202.
---
Resolution: Fixed

[~StephanEwen] Yes, you're right... It's already fixed...

> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>    Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir for 
> background wrapped cluster, which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10206) Add hbase stream connector

2018-08-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591288#comment-16591288
 ] 

buptljy commented on FLINK-10206:
-

[~igloo1986] I think we should have a HBaseBatchTableSink first? From what I 
see in org.apache.flink.addons.hbase.example.HBaseWriteExample, it's not a 
proper way to sink hbase data.

> Add hbase stream connector
> --
>
> Key: FLINK-10206
> URL: https://issues.apache.org/jira/browse/FLINK-10206
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Igloo
>Priority: Critical
> Fix For: 1.6.1
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Now, there is a hbase connector for batch operation. 
>  
> In some cases, we need to save Streaming result into hbase.  Just like 
> cassandra streaming sink implementations. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591188#comment-16591188
 ] 

buptljy commented on FLINK-10202:
-

[~StephanEwen] Here is my code. I think the checkpoint directory in 
CheckpointCoordinator is still empty even if you set direcotry in 
FsStateBackend.
{code:java}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(15000)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new FsStateBackend(checkpointDir)){code}
 1. If I don't set state.checkpoint.dir, it will throw exception above.

 2. If i remove the third line, it will be okay.

> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>    Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir for 
> background wrapped cluster, which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-23 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10202:

Description: 
Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir for 
background wrapped cluster, which will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
{code}
I wonder if we could provide a public method in *StreamExecutionEnvironment* so 
that developers can use it to set state.checkpoint.dir for job.

  was:
Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir, which 
will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
{code}
I wonder if we could provide a public method in *StreamExecutionEnvironment* so 
that developers can use it to set state.checkpoint.dir for job.


> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir for 
> background wrapped cluster, which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-23 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10202:

Summary: Enable configuration for state.checkpoint.dir in 
StreamExecutionEnvironment  (was: Enable configuration for state.checkpoint.dir 
in environment)

> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>    Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir, 
> which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir with environment

2018-08-23 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10202:

Description: 
Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir, which 
will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
{code}
I wonder if we could provide a public method in *StreamExecutionEnvironment* so 
that developers can use it to set state.checkpoint.dir for job.

  was:
Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir, which 
will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");

{code}


> Enable configuration for state.checkpoint.dir with environment
> --
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir, 
> which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in environment

2018-08-23 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10202:

Summary: Enable configuration for state.checkpoint.dir in environment  
(was: Enable configuration for state.checkpoint.dir with environment)

> Enable configuration for state.checkpoint.dir in environment
> 
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>    Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir, 
> which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10202) Enable configuration for state.checkpoint.dir with environment

2018-08-23 Thread buptljy (JIRA)
buptljy created FLINK-10202:
---

 Summary: Enable configuration for state.checkpoint.dir with 
environment
 Key: FLINK-10202
 URL: https://issues.apache.org/jira/browse/FLINK-10202
 Project: Flink
  Issue Type: Improvement
Reporter: buptljy


Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir, which 
will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-10170) Support string representation for map types in descriptor-based Table API

2018-08-19 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10170:

Comment: was deleted

(was: I think this is duplicated by FLINK-10120, can you help implement the 
same thing on other types like array?

You can assign that issue to you by yourself.)

> Support string representation for map types in descriptor-based Table API
> -
>
> Key: FLINK-10170
> URL: https://issues.apache.org/jira/browse/FLINK-10170
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-10170
>
>
> Since 1.6 the recommended way of creating source/sink table is using 
> connector/format/schema/ descriptors. However, when declaring map types in 
> the schema descriptor, the following exception would be thrown:
> {quote}org.apache.flink.table.api.TableException: A string representation for 
> map types is not supported yet.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10170) Support string representation for map types in descriptor-based Table API

2018-08-19 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585210#comment-16585210
 ] 

buptljy commented on FLINK-10170:
-

I think this duplicates FLINK-10120, can you help implement the same thing on 
other types like array? You can assign that issue to yourself.

> Support string representation for map types in descriptor-based Table API
> -
>
> Key: FLINK-10170
> URL: https://issues.apache.org/jira/browse/FLINK-10170
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-10170
>
>
> Since 1.6 the recommended way of creating source/sink table is using 
> connector/format/schema/ descriptors. However, when declaring map types in 
> the schema descriptor, the following exception would be thrown:
> {quote}org.apache.flink.table.api.TableException: A string representation for 
> map types is not supported yet.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10170) Support string representation for map types in descriptor-based Table API

2018-08-19 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585204#comment-16585204
 ] 

buptljy commented on FLINK-10170:
-

I think this is duplicated by FLINK-10120, can you help implement the same 
thing on other types like array?

You can assign that issue to you by yourself.

> Support string representation for map types in descriptor-based Table API
> -
>
> Key: FLINK-10170
> URL: https://issues.apache.org/jira/browse/FLINK-10170
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-10170
>
>
> Since 1.6 the recommended way of creating source/sink table is using 
> connector/format/schema/ descriptors. However, when declaring map types in 
> the schema descriptor, the following exception would be thrown:
> {quote}org.apache.flink.table.api.TableException: A string representation for 
> map types is not supported yet.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-08-19 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585121#comment-16585121
 ] 

buptljy commented on FLINK-10168:
-

[~phoenixjiangnan] You're right. The second solution makes more sense to me.

I think we can provide a new FileFilter, which allow developers to define this 
FileFilter based on: 
 # File name.
 # Modified time.
 # Create time.

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: buptljy
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path. What 
> this means is that, currently the API is 
> {{FileInputFormat.setFilesFilters(PathFiter)}} that takes only one file path 
> filter. A more generic API that can take more filters can look like this 1) 
> {{FileInputFormat.setFilesFilters(List (PathFiter, ModifiedTileFilter, ... 
> ))}}
> 2) or {{FileInputFormat.setFilesFilters(FileFiter),}} and {{FileFilter}} 
> exposes all file attributes that Flink's file system can provide, like path 
> and modified time
> I lean towards the 2nd option, because it gives users more flexibility to 
> define complex filtering rules based on combinations of file attributes.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-08-18 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16584675#comment-16584675
 ] 

buptljy commented on FLINK-10168:
-

[~phoenixjiangnan] Thanks! I think this will be a good improvement. We can 
define some readfile functions, which are based on the prefix and suffix of 
file names and last modified time.

However, is it necessary to expose a generic filter function and let developers 
define their own file filters?  Do we really have so many different application 
scenarios of readfile function? As far as I know, most cases can be covered by 
the three functions above.

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: buptljy
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10168) support filtering files by modified/created time in StreamExecutionEnvironment.readFile()

2018-08-18 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-10168:
---

Assignee: buptljy

> support filtering files by modified/created time in 
> StreamExecutionEnvironment.readFile()
> -
>
> Key: FLINK-10168
> URL: https://issues.apache.org/jira/browse/FLINK-10168
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: buptljy
>Priority: Major
> Fix For: 1.7.0
>
>
> support filtering files by modified/created time in 
> {{StreamExecutionEnvironment.readFile()}}
> for example, in a source dir with lots of file, we only want to read files 
> that is created or modified after a specific time.
> This API can expose a generic filter function of files, and let users define 
> filtering rules. Currently Flink only supports filtering files by path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-16 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582731#comment-16582731
 ] 

buptljy commented on FLINK-10119:
-

[~twalthr] Your suggestion sounds good to me. Can you review my PR?

> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-16 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579779#comment-16579779
 ] 

buptljy edited comment on FLINK-10119 at 8/16/18 9:55 AM:
--

[~twalthr] 
So I am going to add two optional properties.
1. ignore-when-error: ignore the error line if this is configured to be true.
2. additinal-error-field: add error messages to the additional row and the 
others are null.


was (Author: wind_ljy):
[~twalthr] I don't agree about  setting all fields null, it is too weired and 
doesn't make sense to pass an empty row. I think we should ignore this line.
So I am going to add two optional properties.
1. ignore-when-error: ignore the error line if this is configured to be true.
2. additinal-error-field: add error messages to the additional row and the 
others are null.

> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-16 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579779#comment-16579779
 ] 

buptljy edited comment on FLINK-10119 at 8/16/18 7:43 AM:
--

[~twalthr] I don't agree about  setting all fields null, it is too weired and 
doesn't make sense to pass an empty row. I think we should ignore this line.
So I am going to add two optional properties.
1. ignore-when-error: ignore the error line if this is configured to be true.
2. additinal-error-field: add error messages to the additional row and the 
others are null.


was (Author: wind_ljy):
[~twalthr] Okay, I am going to add two optional properties.
1. ignore-when-error: ignore the error line if this is configured to be true.
2. additinal-error-field: add error messages to the additional row and the 
others are null.

> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-14 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579779#comment-16579779
 ] 

buptljy commented on FLINK-10119:
-

[~twalthr] Okay, I am going to add two optional properties.
1. ignore-when-error: ignore the error line if this is configured to be true.
2. additinal-error-field: add error messages to the additional row and the 
others are null.

> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10126) There should be a Scala DataSource

2018-08-13 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577877#comment-16577877
 ] 

buptljy commented on FLINK-10126:
-

[~yanghua] Ok, it makes sense to me.

> There should be a Scala DataSource
> --
>
> Key: FLINK-10126
> URL: https://issues.apache.org/jira/browse/FLINK-10126
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexis Sarda-Espinosa
>Assignee: vinoyang
>Priority: Minor
>  Labels: datasource, scala
>
> In Java, an ExecutionEnvironment's createInput method returns a DataSource, 
> whereas the Scala version returns a DataSet. There is no Scala DataSource 
> wrapper, and the Scala DataSet does not provide the Java DataSource methods, 
> such as getSplitDataProperties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577862#comment-16577862
 ] 

buptljy commented on FLINK-9964:


[~twalthr] Could you review my PR ?

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10128) Timeout on transfering jars on travis-ci

2018-08-12 Thread buptljy (JIRA)
buptljy created FLINK-10128:
---

 Summary: Timeout on transfering jars on travis-ci
 Key: FLINK-10128
 URL: https://issues.apache.org/jira/browse/FLINK-10128
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: buptljy


07:55:06.549 [ERROR] Failed to execute goal on project flink-parent: Could not 
resolve dependencies for project 
org.apache.flink:flink-parent:pom:1.7-SNAPSHOT: Could not transfer artifact 
com.google.code.findbugs:jsr305:jar:1.3.9 from/to central 
(http://repo.maven.apache.org/maven2): Connect to repo.maven.apache.org:80 
[repo.maven.apache.org/151.101.184.215] failed: Connection timed out 
(Connection timed out) -> [Help 1]

[Travis Page|https://travis-ci.org/apache/flink/jobs/415047040]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10126) There should be a Scala DataSource

2018-08-12 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577785#comment-16577785
 ] 

buptljy commented on FLINK-10126:
-

[~yanghua] Hi, I'm interested in how you're going to improve this. I'll be 
grateful if you can give me some detailed implementations in the comments.

> There should be a Scala DataSource
> --
>
> Key: FLINK-10126
> URL: https://issues.apache.org/jira/browse/FLINK-10126
> Project: Flink
>  Issue Type: Improvement
>Reporter: Alexis Sarda-Espinosa
>Assignee: vinoyang
>Priority: Minor
>  Labels: datasource, scala
>
> In Java, an ExecutionEnvironment's createInput method returns a DataSource, 
> whereas the Scala version returns a DataSet. There is no Scala DataSource 
> wrapper, and the Scala DataSet does not provide the Java DataSource methods, 
> such as getSplitDataProperties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10120) Support string representation for types like array

2018-08-12 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-10120:
---

Assignee: buptljy

> Support string representation for types like array
> --
>
> Key: FLINK-10120
> URL: https://issues.apache.org/jira/browse/FLINK-10120
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>    Reporter: buptljy
>Assignee: buptljy
>Priority: Minor
>
> In TypeStringUtils.readTypeInfo:
> {code:java}
> case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
>  _: PrimitiveArrayTypeInfo[_] =>
>   throw new TableException("A string representation for array types is 
> not supported yet.")
> {code}
> This exception makes us unable to create a table schema or format schema with 
> a array type field.
> I'm not sure whether this is an improvement or not, because you throw an 
> exception explicitly here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-12 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577448#comment-16577448
 ] 

buptljy commented on FLINK-10119:
-

[~twalthr] I can help to do this. 
You mean add an additional field in the deserialized row in 
org.apache.flink.formats.json.JsonRowDeserializationSchema#deserialize if this 
property is configured in formats ?

> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-12 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-10119:
---

Assignee: buptljy

> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10120) Support string representation for types like array

2018-08-10 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575975#comment-16575975
 ] 

buptljy commented on FLINK-10120:
-

[~fhueske] Sorry, the description before was a bit confusing, I've already 
updated it.

> Support string representation for types like array
> --
>
> Key: FLINK-10120
> URL: https://issues.apache.org/jira/browse/FLINK-10120
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>    Reporter: buptljy
>Priority: Minor
>
> In TypeStringUtils.readTypeInfo:
> {code:java}
> case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
>  _: PrimitiveArrayTypeInfo[_] =>
>   throw new TableException("A string representation for array types is 
> not supported yet.")
> {code}
> This exception makes us unable to create a table schema or format schema with 
> a array type field.
> I'm not sure whether this is an improvement or not, because you throw an 
> exception explicitly here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10120) Support string representation for types like array

2018-08-10 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10120:

Description: 
In TypeStringUtils.readTypeInfo:
{code:java}
case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
 _: PrimitiveArrayTypeInfo[_] =>
  throw new TableException("A string representation for array types is not 
supported yet.")
{code}
This exception makes us unable to create a table schema or format schema with a 
array type field.

I'm not sure whether this is an improvement or not, because you throw an 
exception explicitly here.


  was:
Now we have to use row type to replace array type when we want to create a 
table schema or format schema with nested types. For example, we have to use

{code:java}
Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), 
Types.STRING(),Types.STRING()})}
{code}
instead of 

{code:java}
Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), 
BasicArrayTypeInfo.STRING_TYPE_INFO})
{code}
because the program will fail in TypeStringUtils.readTypeInfo.



> Support string representation for types like array
> --
>
> Key: FLINK-10120
> URL: https://issues.apache.org/jira/browse/FLINK-10120
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: buptljy
>Priority: Minor
>
> In TypeStringUtils.readTypeInfo:
> {code:java}
> case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
>  _: PrimitiveArrayTypeInfo[_] =>
>   throw new TableException("A string representation for array types is 
> not supported yet.")
> {code}
> This exception makes us unable to create a table schema or format schema with 
> a array type field.
> I'm not sure whether this is an improvement or not, because you throw an 
> exception explicitly here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10120) Support string representation for types like array

2018-08-10 Thread buptljy (JIRA)
buptljy created FLINK-10120:
---

 Summary: Support string representation for types like array
 Key: FLINK-10120
 URL: https://issues.apache.org/jira/browse/FLINK-10120
 Project: Flink
  Issue Type: Improvement
  Components: Type Serialization System
Reporter: buptljy


Now we have to use row type to replace array type when we want to create a 
table schema or format schema with nested types. For example, we have to use

{code:java}
Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), 
Types.STRING(),Types.STRING()})}
{code}
instead of 

{code:java}
Types.ROW(
new String[]{"a", "b", "c"},
new TypeInformation[]{Types.STRING(), Types.INT(), 
BasicArrayTypeInfo.STRING_TYPE_INFO})
{code}
because the program will fail in TypeStringUtils.readTypeInfo.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-09 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574750#comment-16574750
 ] 

buptljy commented on FLINK-9964:


[~twalthr] I mean the json schema of a csv format data. For example, I can use 
a json string {"a": "string", "b": "integer"} to define the schema of our csv 
data. Should we support this ?

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-08 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574323#comment-16574323
 ] 

buptljy commented on FLINK-9964:


[~twalthr] Should we support json schema for csv format? 
If json schema is needed in csv format, we may need to write a duplicate 
converter like JsonRowSchemaConverter unless we import flink-json module, which 
is inappropriate.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10039) FlinkKafkaProducer - Serializer Error

2018-08-03 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567834#comment-16567834
 ] 

buptljy commented on FLINK-10039:
-

I think only ByteArrayDeserializer is supported according to 
*FlinkKafkaConsumer09.java:298**$setDeserializer.* 

Maybe we can improve it ? 

> FlinkKafkaProducer - Serializer Error
> -
>
> Key: FLINK-10039
> URL: https://issues.apache.org/jira/browse/FLINK-10039
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.2
>Reporter: Akshay Nagpal
>Priority: Major
>
> I am working on a use case where I input the data using Kafka's console 
> producer, read the same data in my program using FlinkKafkaConsumer and write 
> it back to another Kafka topic using FlinkKafkaProducer. 
> I am using 1.4.2 version of the following dependencies:
> flink-java
> flink-streaming-java_2.11
> flink-connector-kafka-0.10_2.11
>  
> The codes are as follows:
> KafkaConsoleProducer:
> {code:java}
> ./bin/kafka-console-producer --broker-list xxx:9092 --topic test1 --property 
> "parse.key=true" --property "key.separator=:" --key-serializer 
> org.apache.kafka.common.serialization.StringSerializer --value-serializer 
> org.apache.kafka.common.serialization.StringSerializer
> {code}
> KafkaFlinkConsumer:
> {code:java}
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "xxx:9092");
> properties.setProperty("zookeeper.connect", "xxx:2181");
> properties.setProperty("group.id", "test");
> properties.setProperty("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> properties.setProperty("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> FlinkKafkaConsumer010 myConsumer = new 
> FlinkKafkaConsumer010("test1", 
> new SimpleStringSchema(),
> properties);
> DataStream stream = env.addSource(myConsumer);
> {code}
> KafkaFlinkProducer:
> {code:java}
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "xxx:9092");
> properties.setProperty("zookeeper.connect", "xxx:2181");
> properties.setProperty("group.id", "test");
> properties.setProperty("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> properties1.setProperty("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> FlinkKafkaProducer010 myProducer = new 
> FlinkKafkaProducer010("my-topic", 
> new SimpleStringSchema(), 
> properties);
> stream.addSink(myProducer);
> {code}
> When I specify key and value serializer as StringSerializer in 
> FlinkKafkaProducer, it gives me the following error in the logs:
>  
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Can't convert value of 
> class [B to class org.apache.kafka.common.serialization.StringSerializer 
> specified in value.serializer
> {code}
> Though it's giving me this error, it's still producing the data in the topic.
> When I am using ByteArraySerializer though with the producer, it is not 
> giving me the error in the logs. It is also giving me the output.
> Moreover, DataStream's print method is not printing the data on the console.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10036) Flink's CSV output format is not consistent with the standard.

2018-08-02 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567721#comment-16567721
 ] 

buptljy edited comment on FLINK-10036 at 8/3/18 3:39 AM:
-

This is very similar to this 
[JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a 
PR for that issue these days. You can discuss with me if you have any questions.


was (Author: wind_ljy):
This is very similar to this 
[JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a 
PR these days. You can discuss with me if you have any questions.

> Flink's CSV output format is not consistent with the standard.
> --
>
> Key: FLINK-10036
> URL: https://issues.apache.org/jira/browse/FLINK-10036
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Caizhi Weng
>Priority: Minor
>
> h2. What's the problem
> Flink's CSV output format is not consistent with the standard 
> ([https://tools.ietf.org/html/rfc4180]).
> In CSV format file, if a field contains comma, quotes or new line, this field 
> should be surrounded with quotes (see section 2.6 in the standard). 
> Specifically, if a field contains quotes, the quotes should be escaped by 
> double quotes (see section 2.7 in the standard).
> For example, to express these two fields in a CSV file:
> {noformat}
> Hello,World
> "Quoted" "String"
> {noformat}
> The CSV file should look like this:
> {noformat}
> "Hello,World","""Quoted"" ""String"""
> {noformat}
> But if we run the following Flink code to output these fields
> {code:java}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> val data = List(
>   ("Hello,World", "\"Quoted\" \"String\"")
> )
> val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
> ds.select('a, 'b)
> val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
> ds.writeToSink(sink)
> env.execute()
> {code}
> We get the following CSV:
> {noformat}
> Hello,World,"Quoted" "String"
> {noformat}
> which is not correct (there are actually 3 fields instead of 2 in this CSV 
> file, and the last field is not valid).
> h2. How am I going to fix it
> I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
> module, and add some test cases to ensure that my fix is correct.
> h2. What's affected
> This fix will change the output of CsvTableSink, and will affect the test 
> cases whose results are written to a CSV file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10036) Flink's CSV output format is not consistent with the standard.

2018-08-02 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567721#comment-16567721
 ] 

buptljy commented on FLINK-10036:
-

This is very similar to this 
[JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a 
PR these days. You can discuss with me if you have any questions.

> Flink's CSV output format is not consistent with the standard.
> --
>
> Key: FLINK-10036
> URL: https://issues.apache.org/jira/browse/FLINK-10036
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Caizhi Weng
>Priority: Minor
>
> h2. What's the problem
> Flink's CSV output format is not consistent with the standard 
> ([https://tools.ietf.org/html/rfc4180]).
> In CSV format file, if a field contains comma, quotes or new line, this field 
> should be surrounded with quotes (see section 2.6 in the standard). 
> Specifically, if a field contains quotes, the quotes should be escaped by 
> double quotes (see section 2.7 in the standard).
> For example, to express these two fields in a CSV file:
> {noformat}
> Hello,World
> "Quoted" "String"
> {noformat}
> The CSV file should look like this:
> {noformat}
> "Hello,World","""Quoted"" ""String"""
> {noformat}
> But if we run the following Flink code to output these fields
> {code:java}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> val data = List(
>   ("Hello,World", "\"Quoted\" \"String\"")
> )
> val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
> ds.select('a, 'b)
> val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
> ds.writeToSink(sink)
> env.execute()
> {code}
> We get the following CSV:
> {noformat}
> Hello,World,"Quoted" "String"
> {noformat}
> which is not correct (there are actually 3 fields instead of 2 in this CSV 
> file, and the last field is not valid).
> h2. How am I going to fix it
> I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
> module, and add some test cases to ensure that my fix is correct.
> h2. What's affected
> This fix will change the output of CsvTableSink, and will affect the test 
> cases whose results are written to a CSV file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9964) Add a CSV table format factory

2018-07-27 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559502#comment-16559502
 ] 

buptljy edited comment on FLINK-9964 at 7/27/18 9:50 AM:
-

[~twalthr] I've tried some tests on the Jackson library, and it looks good 
except that it cannot support nested Array values like *String[][].class*, but 
I think we should do what you just said at the first step.


was (Author: wind_ljy):
[~twalthr] I've tried some tests on the Jackson library, and it looks good 
except that it cannot support nested Array values like *String[][].class*, but 
I think we should do the same as you said at the first step.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-07-27 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559502#comment-16559502
 ] 

buptljy commented on FLINK-9964:


[~twalthr] I've tried some tests on the Jackson library, and it looks good 
except that it cannot support nested Array values like *String[][].class*, but 
I think we should do the same as you said at the first step.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-07-27 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559381#comment-16559381
 ] 

buptljy commented on FLINK-9964:


[~twalthr] Thank you for your description above, it already helps me a lot. I 
will discuss with you if there is something that I'm not sure about.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9966) Add a ORC table factory

2018-07-26 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-9966:
--

Assignee: buptljy

> Add a ORC table factory
> ---
>
> Key: FLINK-9966
> URL: https://issues.apache.org/jira/browse/FLINK-9966
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should allow to define an {{OrcTableSource}} using a table factory. How we 
> split connector and format is up for discussion. An ORC format might also be 
> necessary for the new streaming file sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9964) Add a CSV table format factory

2018-07-26 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-9964:
--

Assignee: buptljy

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before close

2018-07-25 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/6412
  
Duplicate. Close.


---


[GitHub] flink pull request #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before ...

2018-07-25 Thread buptljy
Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/6412


---


[GitHub] flink pull request #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before ...

2018-07-25 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/6412

[FLINK-9941] Flush in ScalaCsvOutputFormat before close

## What is the purpose of the change
- Flush in ScalaCsvOutputFormat before close.We've already finished it in 
org.apache.flink.api.java.io.CsvOutputFormat.

## Brief change log
- add flush in ScalaCsvOutputFormat before close.
## Verifying this change
- unit tests.
## Does this pull request potentially affect one of the following parts:
- no

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink FLINK-9941

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6412


commit 636456d2398bef69a805b96dfb0945459cfcfada
Author: wind 
Date:   2018-07-25T06:01:36Z

flush ScalaCsvOutputFormat before close




---


[jira] [Commented] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555175#comment-16555175
 ] 

buptljy commented on FLINK-9941:


[~lemonjing] You're right. I'll fix it.

> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-9941:
--

Assignee: buptljy

> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6935) Integration of SQL and CEP

2018-07-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555165#comment-16555165
 ] 

buptljy commented on FLINK-6935:


I am very interested in the CEP part. And my team is exploring this feature and 
try to integrate it into our products, and I also want to contribute to this 
feature. However I find that many tasks are not in the correct status and many 
pull requests have not been reviewed for a long time. Is there anyone can help 
to manage these Jira issues ?

> Integration of SQL and CEP
> --
>
> Key: FLINK-6935
> URL: https://issues.apache.org/jira/browse/FLINK-6935
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP, Table API  SQL
>Reporter: Jark Wu
>Assignee: Dian Fu
>Priority: Major
>
> Flink's CEP library is a great library for complex event processing, more and 
> more customers are expressing their interests in it. But it also has some 
> limitations that users usually have to write a lot of code even for a very 
> simple pattern match use case as it currently only supports the Java API.
> CEP DSLs and SQLs strongly resemble each other. CEP's additional features 
> compared to SQL boil down to pattern detection. So It will be awesome to 
> consolidate CEP and SQL. It makes SQL more powerful to support more usage 
> scenario. And it gives users the ability to easily and quickly to build CEP 
> applications.
> The FLIP can be found here:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
> This is an umbrella issue for the FLIP. We should wait for Calcite 1.13 to 
> start this work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-6938) IterativeCondition should support RichFunction interface

2018-07-24 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-6938:
--

Assignee: Jark Wu  (was: buptljy)

> IterativeCondition should support RichFunction interface
> 
>
> Key: FLINK-6938
> URL: https://issues.apache.org/jira/browse/FLINK-6938
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> In FLIP-20, we need IterativeCondition to support an {{open()}} method to 
> compile the generated code once. We do not want to insert a if condition  in 
> the {{filter()}} method. So I suggest make IterativeCondition support 
> {{RichFunction}} interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-6938) IterativeCondition should support RichFunction interface

2018-07-24 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-6938:
--

Assignee: buptljy  (was: Jark Wu)

> IterativeCondition should support RichFunction interface
> 
>
> Key: FLINK-6938
> URL: https://issues.apache.org/jira/browse/FLINK-6938
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: buptljy
>Priority: Major
>
> In FLIP-20, we need IterativeCondition to support an {{open()}} method to 
> compile the generated code once. We do not want to insert a if condition  in 
> the {{filter()}} method. So I suggest make IterativeCondition support 
> {{RichFunction}} interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9834) Unable to support scala BasicArrayTypeInfo

2018-07-24 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy closed FLINK-9834.
--
Resolution: Won't Do

> Unable to support scala BasicArrayTypeInfo
> --
>
> Key: FLINK-9834
> URL: https://issues.apache.org/jira/browse/FLINK-9834
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>    Reporter: buptljy
>Assignee: buptljy
>Priority: Major
>
> BasicArrayTypeInfo does not supported scala type in some circumstances. For 
> example,
> {code:scala}
> // we set a descriptor here and get value from it.
> val datas: mutable.Map[String, Array[Byte]] = mutable.Map()
> val descriptor = new MapStateDescriptor("realtime-state",
> BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
> val state = context.getKeyedStateStore.getMapState(descriptor)
> val iter = state.entries().iterator()
> while (iter.hasNext) {
>val entry = iter.next()
>datas.put(entry.getKey, entry.getValue)
> }
> {code}
> The codes above cannot be compiled successfully because the "state" is using 
> java.lang.Byte but the "datas" is using scala.Byte, and we have to iterate 
> all values of the Array like "datas.put(entry.getKey, entry.getValue.map(byte 
> => byte.asInstanceOf[Byte]))", which is definitely not what we want.
> I suggest that we create a ScalaBasicArrayTypeInfo like the 
> "BasicArrayTypeInfo" for scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9834) Unable to support scala BasicArrayTypeInfo

2018-07-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555157#comment-16555157
 ] 

buptljy commented on FLINK-9834:


It seems that we can use _*implicitly[TypeInformation[Array[Byte]]]*_ to do 
that, although I don't think it's an appropriate way

> Unable to support scala BasicArrayTypeInfo
> --
>
> Key: FLINK-9834
> URL: https://issues.apache.org/jira/browse/FLINK-9834
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>    Reporter: buptljy
>Assignee: buptljy
>Priority: Major
>
> BasicArrayTypeInfo does not supported scala type in some circumstances. For 
> example,
> {code:scala}
> // we set a descriptor here and get value from it.
> val datas: mutable.Map[String, Array[Byte]] = mutable.Map()
> val descriptor = new MapStateDescriptor("realtime-state",
> BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
> val state = context.getKeyedStateStore.getMapState(descriptor)
> val iter = state.entries().iterator()
> while (iter.hasNext) {
>val entry = iter.next()
>datas.put(entry.getKey, entry.getValue)
> }
> {code}
> The codes above cannot be compiled successfully because the "state" is using 
> java.lang.Byte but the "datas" is using scala.Byte, and we have to iterate 
> all values of the Array like "datas.put(entry.getKey, entry.getValue.map(byte 
> => byte.asInstanceOf[Byte]))", which is definitely not what we want.
> I suggest that we create a ScalaBasicArrayTypeInfo like the 
> "BasicArrayTypeInfo" for scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9834) Unable to support scala BasicArrayTypeInfo

2018-07-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554185#comment-16554185
 ] 

buptljy commented on FLINK-9834:


[~aljoscha]

I think we should implement the *_createSerializer_* in ***TraversableTypeInfo* 
like what we do in 

*BasicArrayTypeInfo,* so that we can directly use *TraversableTypeInfo* to 
create TypeInfo for scala collections, like *TraversableTypeInfoTest*.

> Unable to support scala BasicArrayTypeInfo
> --
>
> Key: FLINK-9834
> URL: https://issues.apache.org/jira/browse/FLINK-9834
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>    Reporter: buptljy
>Assignee: buptljy
>Priority: Major
>
> BasicArrayTypeInfo does not supported scala type in some circumstances. For 
> example,
> {code:scala}
> // we set a descriptor here and get value from it.
> val datas: mutable.Map[String, Array[Byte]] = mutable.Map()
> val descriptor = new MapStateDescriptor("realtime-state",
> BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
> val state = context.getKeyedStateStore.getMapState(descriptor)
> val iter = state.entries().iterator()
> while (iter.hasNext) {
>val entry = iter.next()
>datas.put(entry.getKey, entry.getValue)
> }
> {code}
> The codes above cannot be compiled successfully because the "state" is using 
> java.lang.Byte but the "datas" is using scala.Byte, and we have to iterate 
> all values of the Array like "datas.put(entry.getKey, entry.getValue.map(byte 
> => byte.asInstanceOf[Byte]))", which is definitely not what we want.
> I suggest that we create a ScalaBasicArrayTypeInfo like the 
> "BasicArrayTypeInfo" for scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9834) Unable to support scala BasicArrayTypeInfo

2018-07-22 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-9834:
--

Assignee: buptljy

> Unable to support scala BasicArrayTypeInfo
> --
>
> Key: FLINK-9834
> URL: https://issues.apache.org/jira/browse/FLINK-9834
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>    Reporter: buptljy
>Assignee: buptljy
>Priority: Major
>
> BasicArrayTypeInfo does not supported scala type in some circumstances. For 
> example,
> {code:scala}
> // we set a descriptor here and get value from it.
> val datas: mutable.Map[String, Array[Byte]] = mutable.Map()
> val descriptor = new MapStateDescriptor("realtime-state",
> BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
> val state = context.getKeyedStateStore.getMapState(descriptor)
> val iter = state.entries().iterator()
> while (iter.hasNext) {
>val entry = iter.next()
>datas.put(entry.getKey, entry.getValue)
> }
> {code}
> The codes above cannot be compiled successfully because the "state" is using 
> java.lang.Byte but the "datas" is using scala.Byte, and we have to iterate 
> all values of the Array like "datas.put(entry.getKey, entry.getValue.map(byte 
> => byte.asInstanceOf[Byte]))", which is definitely not what we want.
> I suggest that we create a ScalaBasicArrayTypeInfo like the 
> "BasicArrayTypeInfo" for scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6381: [FLINK-7205] [table]Add UUID supported in SQL ...

2018-07-21 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/6381

[FLINK-7205] [table]Add UUID supported in SQL and TableApi

## What is the purpose of the change
* Add UUID supported in SQL and TableApi.
## Brief change log
* Add UUID function.
## Verifying this change
* Unit tests.

## Documentation
* add in table.md and sql.md

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink FLINK-7205

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6381.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6381


commit 5cde30f8feb2feb49dc1381af3d1d288c39122f0
Author: wind 
Date:   2018-07-21T15:20:21Z

add uuid table function

commit 8829de68bee64c6709d55efd17c09beabdb7a8be
Author: wind 
Date:   2018-07-21T15:32:42Z

add docs for uuid




---


[jira] [Commented] (FLINK-7205) Add UUID supported in TableAPI/SQL

2018-07-21 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551722#comment-16551722
 ] 

buptljy commented on FLINK-7205:


[~fhueske]

I find that we cannot test _*UUID*_ function in *_ScalarFunctionsTest_* because 
we're not able to offer an expected UUID...

In my opinion, we can test the _*UUID*_ **in another way, like testing the 
length of the string and the position of the seperator "-", though it does not 
look very appropriate.

> Add UUID supported in TableAPI/SQL
> --
>
> Key: FLINK-7205
> URL: https://issues.apache.org/jira/browse/FLINK-7205
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>
> UUID() returns a value that conforms to UUID version 1 as described in RFC 
> 4122. The value is a 128-bit number represented as a utf8 string of five 
> hexadecimal numbers in ---- format:
> The first three numbers are generated from the low, middle, and high parts of 
> a timestamp. The high part also includes the UUID version number.
> The fourth number preserves temporal uniqueness in case the timestamp value 
> loses monotonicity (for example, due to daylight saving time).
> The fifth number is an IEEE 802 node number that provides spatial uniqueness. 
> A random number is substituted if the latter is not available (for example, 
> because the host device has no Ethernet card, or it is unknown how to find 
> the hardware address of an interface on the host operating system). In this 
> case, spatial uniqueness cannot be guaranteed. Nevertheless, a collision 
> should have very low probability.
> See: [RFC 4122: 
> http://www.ietf.org/rfc/rfc4122.txt|http://www.ietf.org/rfc/rfc4122.txt]
> See detailed semantics:
>MySql: 
> [https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid|https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid]
> Welcome anybody feedback -:).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7205) Add UUID supported in TableAPI/SQL

2018-07-21 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551569#comment-16551569
 ] 

buptljy commented on FLINK-7205:


We can use java.util.UUID(also use RFC 4122) to implement this.

 

> Add UUID supported in TableAPI/SQL
> --
>
> Key: FLINK-7205
> URL: https://issues.apache.org/jira/browse/FLINK-7205
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>
> UUID() returns a value that conforms to UUID version 1 as described in RFC 
> 4122. The value is a 128-bit number represented as a utf8 string of five 
> hexadecimal numbers in ---- format:
> The first three numbers are generated from the low, middle, and high parts of 
> a timestamp. The high part also includes the UUID version number.
> The fourth number preserves temporal uniqueness in case the timestamp value 
> loses monotonicity (for example, due to daylight saving time).
> The fifth number is an IEEE 802 node number that provides spatial uniqueness. 
> A random number is substituted if the latter is not available (for example, 
> because the host device has no Ethernet card, or it is unknown how to find 
> the hardware address of an interface on the host operating system). In this 
> case, spatial uniqueness cannot be guaranteed. Nevertheless, a collision 
> should have very low probability.
> See: [RFC 4122: 
> http://www.ietf.org/rfc/rfc4122.txt|http://www.ietf.org/rfc/rfc4122.txt]
> See detailed semantics:
>MySql: 
> [https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid|https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid]
> Welcome anybody feedback -:).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7205) Add UUID supported in TableAPI/SQL

2018-07-21 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-7205:
--

Assignee: buptljy

> Add UUID supported in TableAPI/SQL
> --
>
> Key: FLINK-7205
> URL: https://issues.apache.org/jira/browse/FLINK-7205
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>
> UUID() returns a value that conforms to UUID version 1 as described in RFC 
> 4122. The value is a 128-bit number represented as a utf8 string of five 
> hexadecimal numbers in ---- format:
> The first three numbers are generated from the low, middle, and high parts of 
> a timestamp. The high part also includes the UUID version number.
> The fourth number preserves temporal uniqueness in case the timestamp value 
> loses monotonicity (for example, due to daylight saving time).
> The fifth number is an IEEE 802 node number that provides spatial uniqueness. 
> A random number is substituted if the latter is not available (for example, 
> because the host device has no Ethernet card, or it is unknown how to find 
> the hardware address of an interface on the host operating system). In this 
> case, spatial uniqueness cannot be guaranteed. Nevertheless, a collision 
> should have very low probability.
> See: [RFC 4122: 
> http://www.ietf.org/rfc/rfc4122.txt|http://www.ietf.org/rfc/rfc4122.txt]
> See detailed semantics:
>MySql: 
> [https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid|https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid]
> Welcome anybody feedback -:).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6359: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-07-18 Thread buptljy
Github user buptljy commented on a diff in the pull request:

https://github.com/apache/flink/pull/6359#discussion_r203268495
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/StrToDateCallGen.scala
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.runtime.functions.DateTimeFunctions
+
+class StrToDateCallGen extends CallGenerator {
--- End diff --

Fixed.


---


[jira] [Comment Edited] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-07-18 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546071#comment-16546071
 ] 

buptljy edited comment on FLINK-6895 at 7/18/18 6:13 AM:
-

[~twalthr]
I've submit a PR about this issue. But I find it's difficult to return 
different types according to the value of the literal, because we cannot get 
the value of the literal in CallGenerator#generate.
Maybe we need another functions called "StrToTime" and "StrToDatetime" ?
[~nssalian] Do you have any good ideas ?


was (Author: wind_ljy):
[~twalthr]
I think we can use "isOperandLiteral" function in SqlOperatorBinding class.
For str_to_date("20-10-2015", "%d-%m-%Y"), the "isOperandLiteral(1, false)" 
returns true,
and for str_to_date("20-10-2015", myField), the "isOperandLiteral(1, false)" 
return false because myField is an identifier.
And we can do the same judgement in "CallGenerator".

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5628: [FLINK-6895][table]Add STR_TO_DATE supported in SQL

2018-07-18 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5628
  
Submit another [Pull Request](https://github.com/apache/flink/pull/6359) 
according to the discussion in FLINK-6895.


---


[GitHub] flink pull request #5628: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-07-18 Thread buptljy
Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/5628


---


[GitHub] flink pull request #6359: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-07-18 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/6359

[FLINK-6895][table]Add STR_TO_DATE supported in SQL

## What is the purpose of the change
Add STR_TO_DATE Function supported in SQL
## Brief change log
 * STR_TO_DATE(str string, format string)  
\-  if the format is literal, the return type will be timestamp, 
otherwise it's date.
 * Add tests in ScalarFunctionsTest.scala
 * Add docs in sql.md
## Verifying this change
 * Run unit tests in  ScalarFunctionsTest.scala
## Does this pull request potentially affect one of the following parts:
 * A new sql function
## Documentation
  * Add docs in sql.md

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink 6895-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6359.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6359


commit 339a9e37c746dc8d3d283570c65f175edfee67b4
Author: wind 
Date:   2018-07-18T04:33:04Z

add StrToDate function




---


[jira] [Comment Edited] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-07-17 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546071#comment-16546071
 ] 

buptljy edited comment on FLINK-6895 at 7/17/18 6:10 AM:
-

[~twalthr]
I think we can use "isOperandLiteral" function in SqlOperatorBinding class.
For str_to_date("20-10-2015", "%d-%m-%Y"), the "isOperandLiteral(1, false)" 
returns true,
and for str_to_date("20-10-2015", myField), the "isOperandLiteral(1, false)" 
return false because myField is an identifier.
And we can do the same judgement in "CallGenerator".


was (Author: wind_ljy):
[~twalthr]
I think we can use "isOperandLiteral" function in SqlOperatorBinding class.
For str_to_date("20-10-2015", "%d-%m-%Y"), the "isOperandLiteral(1, false)" 
returns true,
and for str_to_date("20-10-2015", myField), the "isOperandLiteral(1, false)" 
return false because myField is an identifier.

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-07-17 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546071#comment-16546071
 ] 

buptljy commented on FLINK-6895:


[~twalthr]
I think we can use "isOperandLiteral" function in SqlOperatorBinding class.
For str_to_date("20-10-2015", "%d-%m-%Y"), the "isOperandLiteral(1, false)" 
returns true,
and for str_to_date("20-10-2015", myField), the "isOperandLiteral(1, false)" 
return false because myField is an identifier.

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5628: [FLINK-6895][table]Add STR_TO_DATE supported in SQL

2018-07-17 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5628
  
@twalthr 
Sorry for late reply here.
This can be closed now, and I'll submit another PR this week.
I've posted my solution in FLINK-6895, we can have a discussion there.



---


[jira] [Created] (FLINK-9834) Unable to support scala BasicArrayTypeInfo

2018-07-12 Thread buptljy (JIRA)
buptljy created FLINK-9834:
--

 Summary: Unable to support scala BasicArrayTypeInfo
 Key: FLINK-9834
 URL: https://issues.apache.org/jira/browse/FLINK-9834
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: buptljy


BasicArrayTypeInfo does not supported scala type in some circumstances. For 
example,

{code:scala}
// we set a descriptor here and get value from it.
val datas: mutable.Map[String, Array[Byte]] = mutable.Map()
val descriptor = new MapStateDescriptor("realtime-state",
BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
val state = context.getKeyedStateStore.getMapState(descriptor)
val iter = state.entries().iterator()
while (iter.hasNext) {
   val entry = iter.next()
   datas.put(entry.getKey, entry.getValue)
}
{code}
The codes above cannot be compiled successfully because the "state" is using 
java.lang.Byte but the "datas" is using scala.Byte, and we have to iterate all 
values of the Array like "datas.put(entry.getKey, entry.getValue.map(byte => 
byte.asInstanceOf[Byte]))", which is definitely not what we want.

I suggest that we create a ScalaBasicArrayTypeInfo like the 
"BasicArrayTypeInfo" for scala.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9178) Add rate control for kafka source

2018-07-03 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16530933#comment-16530933
 ] 

buptljy commented on FLINK-9178:


[~tzulitai] I'd like to add something about this issue, whick is very similar 
with the problem that I've met recently.
The program is developed for receiving realtime data and count distinct ip 
within a 10-minutes window, and sink the aggregated data into hbase.(The window 
is based on event time.) Now something goes wrong and we want to re-consume all 
data from kafka's earliest offset, but it can't work very well because there 
will be too many event-time-windows in the memory.
I think it'll be okay if we use ProcessingTime instead, because there will be 
only a single window even though you consume from the earliest offset. So I 
wonder if we can add a parameter to control the rate of receiving data, like a 
upper bound of consuming rate ?

> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>    Reporter: buptljy
>Assignee: Tarush Grover
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM if there are too much data, because of too many 
> HeapMemorySegment in NetworkBufferPool.
> Maybe we should have some settings to control the rate of the receiving data?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5877: [FLINK-8785][Job-Submission]Handle JobSubmissionEx...

2018-06-13 Thread buptljy
Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/5877


---


[jira] [Commented] (FLINK-7554) Add a testing RuntimeContext to test utilities

2018-06-11 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509099#comment-16509099
 ] 

buptljy commented on FLINK-7554:


[~twalthr] Can you help review my PR ?


> Add a testing RuntimeContext to test utilities
> --
>
> Key: FLINK-7554
> URL: https://issues.apache.org/jira/browse/FLINK-7554
> Project: Flink
>  Issue Type: New Feature
>  Components: Tests
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: starter
>
> When unit testing user-defined functions it would be useful to have an 
> official testing {{RuntimeContext}} that uses Java collections for storing 
> state, metrics, etc.
> After executing the business logic, the user could then verify how the state 
> of the UDF changed or which metrics have been collected.
> This issue includes documentation for the "Testing" section.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6148: [FLINK-7554][Tests] Add a testing RuntimeContext t...

2018-06-11 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/6148

[FLINK-7554][Tests] Add a testing RuntimeContext to test utilities

## What is the purpose of the change
This pull request adds a TestingRuntimeContext to help developers test 
their own user-defined functions like CoprocessFunction instead of testing the 
whole pipeline.

## Brief change log
 - TestingRuntimeContext can be directly used in unit testing of both 
dataset functions and datastream functions, including the support of 
broadcastVariable, state, accumulator and so on.
 - Documents are written in udf_test, including examples of Java and Scala.

## Verifying this change

 - This change has already been verified in new unit testing files in test 
directory, 
org.apache.flink.streaming.api.functions.test.TestingRuntimeContextTest and 
org.apache.flink.streaming.api.functions.test.TestingRuntimeContextTest.

## Documentation

  - This pull request is a new feature.
  - Related documents are written in docs directory.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink 7554

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6148.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6148


commit 8535b22a7cf1545381bcd7b5febc0d4763f925aa
Author: wind 
Date:   2018-06-11T12:40:19Z

add TestingRuntimeContext




---


[jira] [Commented] (FLINK-9178) Add rate control for kafka source

2018-04-26 Thread buptljy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16455677#comment-16455677
 ] 

buptljy commented on FLINK-9178:


[~app-tarush] I think so, and there is another option is using the limitation 
of bytes for every second, but I think the strategy fo using the amount of 
events is better.

> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>    Reporter: buptljy
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM if there are too much data, because of too many 
> HeapMemorySegment in NetworkBufferPool.
> Maybe we should have some settings to control the rate of the receiving data?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9178) Add rate control for kafka source

2018-04-26 Thread buptljy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453686#comment-16453686
 ] 

buptljy edited comment on FLINK-9178 at 4/26/18 8:46 AM:
-

[~app-tarush]  It seems that I don't have the permission to assign this to you. 
[~tzulitai] Can you do that ?


was (Author: wind_ljy):
[~app-tarush] Of course. Thanks !

> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>    Reporter: buptljy
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM if there are too much data, because of too many 
> HeapMemorySegment in NetworkBufferPool.
> Maybe we should have some settings to control the rate of the receiving data?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9178) Add rate control for kafka source

2018-04-26 Thread buptljy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453686#comment-16453686
 ] 

buptljy commented on FLINK-9178:


[~app-tarush] Of course. Thanks !

> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>    Reporter: buptljy
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM if there are too much data, because of too many 
> HeapMemorySegment in NetworkBufferPool.
> Maybe we should have some settings to control the rate of the receiving data?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5877: [FLINK-8785][Job-Submission]Handle JobSubmissionException...

2018-04-19 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5877
  
@zentol  ok, I thought that it was a small change before, so you mean we 
can make more changes on the messages of the exceptions and let them reported 
more properly ? 
I will test all the cases in the JobSubmissionFailsITCase. 


---


[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-04-18 Thread buptljy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16443563#comment-16443563
 ] 

buptljy commented on FLINK-8785:


[~Zentol] Can you help review my PR ?

> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: buptljy
>Priority: Critical
>  Labels: flip-6
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5877: [FLINK-8785][Job-Submission]Handle JobSubmissionEx...

2018-04-18 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5877

[FLINK-8785][Job-Submission]Handle JobSubmissionExceptions

## What is the purpose of the change

We will get an "Internal server error" exception if we submit a jobgraph 
with a restclusterclient. This PR helps us get more details and causes of the 
exception, such as "The jobgraph is empty" message.

## Brief change log
Add causes and details of an exception which happens in job submission.

## Verifying this change  

## Does this pull request potentially affect one of the following parts:  

## Documentation  


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink 8785

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5877.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5877


commit 443a3c0fda861cd5324083df93ed5080c2f9f476
Author: wind <bupt_ljy@...>
Date:   2018-04-18T17:29:14Z

add error messages




---


[jira] [Assigned] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-04-18 Thread buptljy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-6895:
--

Assignee: buptljy  (was: Aegeaner)

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9178) Add rate control for kafka source

2018-04-18 Thread buptljy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-9178:
---
Description: 
When I want to run the flink program from the earliest offset in Kafka, it'll 
be very easy to cause OOM if there are too much data, because of too many 
HeapMemorySegment in NetworkBufferPool.

Maybe we should have some settings to control the rate of the receiving data?

  was:
When I want to run the flink program from the earliest offset in Kafka, it'll 
be very easy to cause OOM because of too many HeapMemorySegment in 
NetworkBufferPool.

I think we should support this "rerun" situation, which is very common for most 
businesses.


> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: buptljy
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM if there are too much data, because of too many 
> HeapMemorySegment in NetworkBufferPool.
> Maybe we should have some settings to control the rate of the receiving data?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9178) Add rate control for kafka source

2018-04-18 Thread buptljy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16442918#comment-16442918
 ] 

buptljy commented on FLINK-9178:


[~till.rohrmann] Do you have ideas about this issue ? Maybe it can be a new 
feature for the source ?

> Add rate control for kafka source
> -
>
> Key: FLINK-9178
> URL: https://issues.apache.org/jira/browse/FLINK-9178
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>    Reporter: buptljy
>Priority: Major
>
> When I want to run the flink program from the earliest offset in Kafka, it'll 
> be very easy to cause OOM because of too many HeapMemorySegment in 
> NetworkBufferPool.
> I think we should support this "rerun" situation, which is very common for 
> most businesses.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-04-17 Thread buptljy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-8785:
--

Assignee: buptljy

> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: buptljy
>Priority: Critical
>  Labels: flip-6
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8785) JobSubmitHandler does not handle JobSubmissionExceptions

2018-04-17 Thread buptljy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16441047#comment-16441047
 ] 

buptljy commented on FLINK-8785:


[~Zentol] 

Okay, I've already known how to do this. I will assign this task to myself if 
you don't mind.

> JobSubmitHandler does not handle JobSubmissionExceptions
> 
>
> Key: FLINK-8785
> URL: https://issues.apache.org/jira/browse/FLINK-8785
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, JobManager, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> If the job submission, i.e. {{DispatcherGateway#submitJob}} fails with a 
> {{JobSubmissionException}} the {{JobSubmissionHandler}} returns "Internal 
> server error" instead of signaling the failed job submission.
> This can for example occur if the transmitted execution graph is faulty, as 
> tested by the \{{JobSubmissionFailsITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9178) Add rate control for kafka source

2018-04-16 Thread buptljy (JIRA)
buptljy created FLINK-9178:
--

 Summary: Add rate control for kafka source
 Key: FLINK-9178
 URL: https://issues.apache.org/jira/browse/FLINK-9178
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Reporter: buptljy


When I want to run the flink program from the earliest offset in Kafka, it'll 
be very easy to cause OOM because of too many HeapMemorySegment in 
NetworkBufferPool.

I think we should support this "rerun" situation, which is very common for most 
businesses.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7554) Add a testing RuntimeContext to test utilities

2018-04-06 Thread buptljy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428541#comment-16428541
 ] 

buptljy commented on FLINK-7554:


[~twalthr] I find that it seems to be a bit harder for testing streaming 
functions like "CoProcessFunction", in which has a Context, because I can't 
create a Context directly. My idea is to create a testingfunction for it, like:

 
{code:java}
public class TestingCoProcessFunction extends CoProcessFunction<Object, Object, 
Object> {

   private TestingRuntimeContext ctx;

   private Context context = new Context() {
  @Override
  public Long timestamp() {
 return -1L;
  }

  @Override
  public TimerService timerService() {
 return new TestingTimerService();
  }

  @Override
  public  void output(OutputTag outputTag, X value) {
 ctx.addSideOutput(outputTag, value);
  }
   };

   public TestingCoProcessFunction(TestingRuntimeContext ctx) {
  this.ctx = ctx;
   }

   public Context getContext() {
  return context;
   }

   @Override
   public void processElement1(Object value, Context ctx, Collector 
out) throws Exception {}

   @Override
   public void processElement2(Object value, Context ctx, Collector 
out) throws Exception {}
}

{code}
And the test will be like this:
{code:java}
@Test
public void testEnrichmentFunction() throws Exception {
   TestingRuntimeContext ctx = new TestingRuntimeContext(true);
   EnrichmentFunction func = new EnrichmentFunction();
   func.setRuntimeContext(ctx);
   CoProcessFunction.Context context = new 
TestingCoProcessFunction(ctx).getContext();
   ValueStateDescriptor rideStateDesc = new 
ValueStateDescriptor<>("saved ride", TaxiRide.class);
   ValueStateDescriptor fareStateDesc = new 
ValueStateDescriptor<>("saved fare", TaxiFare.class);
   ctx.setState(rideStateDesc, new TestingValueState(null));
   ctx.setState(fareStateDesc, new TestingValueState(null));
   func.open(new Configuration());

   TaxiRide ride1 = new TaxiRide(1);
   func.processElement1(ride1, context, ctx.getCollector());
   Assert.assertEquals(ctx.getState(rideStateDesc).value(), ride1);
}
{code}
And in this way, I will create many testingfunctions for 
"BroadcastProcessFunction", "CoProcessFunction", etc. I can do this but I 
wonder if you have any better ideas ?

 

> Add a testing RuntimeContext to test utilities
> --
>
> Key: FLINK-7554
> URL: https://issues.apache.org/jira/browse/FLINK-7554
> Project: Flink
>  Issue Type: New Feature
>  Components: Tests
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: starter
>
> When unit testing user-defined functions it would be useful to have an 
> official testing {{RuntimeContext}} that uses Java collections for storing 
> state, metrics, etc.
> After executing the business logic, the user could then verify how the state 
> of the UDF changed or which metrics have been collected.
> This issue includes documentation for the "Testing" section.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAPI

2018-03-20 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5638
  
@walterddr What else should I do for this PR ?


---


[GitHub] flink issue #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAPI

2018-03-19 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5638
  
@walterddr As explained in 
[FLINK-8930](https://issues.apache.org/jira/browse/FLINK-8930), the reason that 
"LOG(-1)" and "LOG(1, 100)" succeed is that they're simplified in calcite's 
evaluation, while "testTableApi" doesn't have the same process. And that's why 
I fail to write "testTableApi" for function "LOG".   
If you change "LOG(1, 100)" to "LOG(f30, 100)" which makes calcite unable 
to simplify it, you will get the same error as I get in writing "testTableApi".


---


[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...

2018-03-12 Thread buptljy
Github user buptljy commented on a diff in the pull request:

https://github.com/apache/flink/pull/5638#discussion_r173897783
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1130,4 +1130,13 @@ object concat_ws {
   }
 }
 
+object log {
+  def apply(base: Expression, antilogarithm: Expression): Expression = {
+Log(base, antilogarithm)
+  }
+  def apply(antilogarithm: Expression): Expression = {
+new Log(antilogarithm)
--- End diff --

1. I've tried it before, but it seems that we can't restructure log(x) and 
ln(x) together because the expression in sql will be reflected directly as an 
instance of case class. Please let me know if you figure it out.
2. Actually we should use log(base, antilogarithm).


---


[GitHub] flink issue #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAPI

2018-03-12 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5638
  
@walterddr I'm not able to add a validation test because I am blocked by 
[FLINK-8930](https://issues.apache.org/jira/browse/FLINK-8930). 


---


[GitHub] flink issue #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAPI

2018-03-12 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5638
  
@suez1224 Docs are added in both java and scala.


---


[GitHub] flink pull request #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAP...

2018-03-05 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5638

[FLINK-6924][table]ADD LOG(X) supported in TableAPI

## What is the purpose of the change
  * Add LOG(X) function in TableAPI.
## Brief change log
 * Add LOG(X) function in TableAPI.
 * Modify LOG(X) unit tests from "testSqlApi" to "testAllApis".
## Verifying this change
 * This can be tested by unit testing.
## Does this pull request potentially affect one of the following parts:
 * No
## Documentation
 * 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5638.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5638


commit 376cee8a6bc25afdf8df50b65c3fa8f35a5e4b7c
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-04T09:15:10Z

add log table function

commit 9ab7a07c0b614cb7af0fee7e69f6d58bf5004b28
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-04T09:15:23Z

Merge branch 'master' of github.com:apache/flink into log




---


[GitHub] flink issue #5628: [FLINK-6895][table]Add STR_TO_DATE supported in SQL

2018-03-02 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5628
  
Actually I am not sure if it is appropriate to return "string" because it 
should be the inverse of the "DATE_FORMAT()". However, If I return 
DATE/TIME/DATETIME as the jira issue described, the type of data user receives 
will be uncertain in one of DATE/TIME/DATETIME.
I will optimize it if anyone can give me some suggestions. 


---


[GitHub] flink pull request #5628: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-03-02 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5628

[FLINK-6895][table]Add STR_TO_DATE supported in SQL

## What is the purpose of the change
Add STR_TO_DATE Function supported in SQL
## Brief change log
 * STR_TO_DATE(str string, format string)  
\-  str is the string that need to be transformed.
\- format is the pattern of "str"
 * Add tests in ScalarFunctionsTest.scala
 * Add docs in sql.md
## Verifying this change
 * Run unit tests in  ScalarFunctionsTest.scala
## Does this pull request potentially affect one of the following parts:
 * A new sql function
## Documentation
  * Add docs in sql.md

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink str_to_date2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5628.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5628


commit 4650f5ba5314bc65ded5134a1f76d457455b75d5
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-03T07:00:13Z

add str_to_date sql function

commit 9cfa5245d88dcd08763c3fb7588382a969906412
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-03T07:00:32Z

Merge branch 'master' of github.com:apache/flink into str_to_date2




---


[GitHub] flink pull request #5618: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-03-02 Thread buptljy
Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/5618


---


[GitHub] flink pull request #5618: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-03-02 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5618

[FLINK-6895][table]Add STR_TO_DATE supported in SQL

## What is the purpose of the change
Add STR_TO_DATE Function supported in SQL
## Brief change log
 * STR_TO_DATE(str string, format string)  
\-  str is the string that need to be transformed.
\- format is the pattern of "str"
 * Add tests in ScalarFunctionsTest.scala
 * Add docs in sql.md
## Verifying this change
 * Run unit tests in  ScalarFunctionsTest.scala
## Does this pull request potentially affect one of the following parts:
 * A new sql function
## Documentation
  * Add docs in sql.md

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink str_to_date

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5618


commit 59752143ee438cb11969ae4bdda1fac5fc32813c
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-01T11:58:08Z

add str_to_date sql function

commit 63f71e4b3d6378f2114aa04ba4d1128f1ec3bc38
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-01T11:58:41Z

Merge branch 'master' of github.com:apache/flink

commit 3ec6d2ec487151928032d144de94ca9113d63f01
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-01T15:30:04Z

fix checkstyle error




---


[GitHub] flink issue #5618: [FLINK-6895][table]Add STR_TO_DATE supported in SQL

2018-03-02 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5618
  
Actually I am not sure if it is appropriate to return "string" because it 
should be the inverse of the "DATE_FORMAT()". However, If I return 
DATE/TIME/DATETIME as the jira issue described, the type of data user receives 
will be uncertain in one of DATE/TIME/DATETIME.
Do you have some good ideas ? I will optimize it.


---


[GitHub] flink pull request #5615: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-03-02 Thread buptljy
Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/5615


---


[GitHub] flink issue #5615: [FLINK-6895][table]Add STR_TO_DATE supported in SQL

2018-03-01 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5615
  
Actually I am not sure if it is appropriate to return "string" because it 
should be the inverse of the "DATE_FORMAT()". However, If I return 
DATE/TIME/DATETIME as the jira issue described, the type of data user receives 
will be uncertain in one of DATE/TIME/DATETIME.
Do you have some good ideas ? I will optimize it.


---


[GitHub] flink pull request #5615: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-03-01 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5615

[FLINK-6895][table]Add STR_TO_DATE supported in SQL

## What is the purpose of the change
Add STR_TO_DATE Function supported in SQL
## Brief change log
 * STR_TO_DATE(str string, format string)  
\-  str is the string that need to be transformed.
\- format is the pattern of "str"
 * Add tests in ScalarFunctionsTest.scala
 * Add docs in sql.md
## Verifying this change
 * Run unit tests in  ScalarFunctionsTest.scala
## Does this pull request potentially affect one of the following parts:
 * A new sql function
## Documentation
  * Add docs in sql.md

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5615.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5615


commit 59752143ee438cb11969ae4bdda1fac5fc32813c
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-01T11:58:08Z

add str_to_date sql function

commit 63f71e4b3d6378f2114aa04ba4d1128f1ec3bc38
Author: Liao Jiayi <liaojiayi@...>
Date:   2018-03-01T11:58:41Z

Merge branch 'master' of github.com:apache/flink




---