[jira] [Comment Edited] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580624#comment-16580624
 ] 

Thomas Weise edited comment on FLINK-10074 at 8/15/18 4:09 AM:
---

[~till.rohrmann] it is probably too difficult to retain the count in the JM 
failure case, but how about making it correct for other failures? When the JM 
fails - which is a very small probability - we would reset / not retain the 
count. The majority of failures that we are concerned with are transient issues 
in TMs where subtasks just get redeployed, much fewer cases TM machine 
failures. For all these we could have a more accurate behavior by retaining the 
count and failing right away on the next checkpoint failure.


was (Author: thw):
[~till.rohrmann] it is probably unreasonably hard to retain the count in the JM 
failure case, but how about making it correct for other failures? When the JM 
fails - which is a very small probability - we would reset / not retain the 
count. The majority of failures that we are concerned with are transient issues 
in TMs where subtasks just get redeployed, much fewer cases TM machine 
failures. For all these we could have a more accurate behavior by retaining the 
count and failing right away on the next checkpoint failure.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580699#comment-16580699
 ] 

Thomas Weise commented on FLINK-10074:
--

[~yanghua] this is what I meant also, I updated my comment for clarity.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580624#comment-16580624
 ] 

Thomas Weise edited comment on FLINK-10074 at 8/15/18 4:07 AM:
---

[~till.rohrmann] it is probably unreasonably hard to retain the count in the JM 
failure case, but how about making it correct for other failures? When the JM 
fails - which is a very small probability - we would reset / not retain the 
count. The majority of failures that we are concerned with are transient issues 
in TMs where subtasks just get redeployed, much fewer cases TM machine 
failures. For all these we could have a more accurate behavior by retaining the 
count and failing right away on the next checkpoint failure.


was (Author: thw):
[~till.rohrmann] it is probably unreasonably hard to do for the JM failure 
case, but how about making it correct with best effort? Only when the JM fails, 
we would not retain the count, which is a very small probability. The majority 
of failures are transient issues in TMs where subtasks just get redeployed, 
much fewer cases TM machine failures. For all these we could have a more 
accurate behavior by retaining the count and failing right away on the next 
checkpoint failure.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580687#comment-16580687
 ] 

vinoyang commented on FLINK-10074:
--

[~thw] I personally prefer that once JM failover, the counter will reset. I 
don't think it's necessary to introduce too much complexity for this. If we 
need to maintain a global counter across JM processes, we will use third-party 
components such as zookeeper. I think it is appropriate to maintain this 
counter for the life of a JM process. Once JM failover, the Job will be 
restored (re-deployed, run), and it is reasonable to reset the counter for a 
new runtime environment.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580677#comment-16580677
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-413082655
 
 
   cc @tillrohrmann could you help review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Priority: Minor
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-14 Thread GitBox
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-413082655
 
 
   cc @tillrohrmann could you help review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-08-14 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580627#comment-16580627
 ] 

vinoyang commented on FLINK-:
-

[~walterddr] I don't know if this is the SQL standard Scalar function, but 
there are databases that provide this function. I don't know if the functions 
in Flink Table/SQL are all SQL standard functions. It seems that some are not, 
such as "similar to". Where can I see which standard functions are there? I 
think it's reasonable to add some very common functions, I am going to try to 
make this function support more types. [~twalthr]

> Add ISNUMERIC supported in Table API/SQL
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> ISNUMERIC function used to verify a expression is a valid numberic type.
> documentation : 
> https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-14 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580624#comment-16580624
 ] 

Thomas Weise commented on FLINK-10074:
--

[~till.rohrmann] it is probably unreasonably hard to do for the JM failure 
case, but how about making it correct with best effort? Only when the JM fails, 
we would not retain the count, which is a very small probability. The majority 
of failures are transient issues in TMs where subtasks just get redeployed, 
much fewer cases TM machine failures. For all these we could have a more 
accurate behavior by retaining the count and failing right away on the next 
checkpoint failure.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8953) Resolve unresolved field references in FieldComputer expressions

2018-08-14 Thread Renjie Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu reassigned FLINK-8953:
-

Assignee: (was: Renjie Liu)

> Resolve unresolved field references in FieldComputer expressions
> 
>
> Key: FLINK-8953
> URL: https://issues.apache.org/jira/browse/FLINK-8953
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> When implementing the {{FieldComputer.getExpression}} method, it is not 
> possible to use API classes but only internal expression case classes.
> It would be great to also define timestamp extractors like:
> {code}
>   def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression 
> = {
> // 'x.cast(Types.LONG)
> // ExpressionParser.parseExpression("x.cast(LONG)")
>   }
> {code}
> An even better solution would be to provide different `getExpression()` 
> methods that an implementor can override. The general goal should be to 
> define this as natural as possible. In the future we should also support SQL:
> {code}
>   def getJavaExpression(fieldAccesses: Array[ResolvedFieldReference]): String 
> = {
> "x.cast(LONG)"
>   }
>   def getSQLExpression(fieldAccesses: Array[ResolvedFieldReference]): String 
> = {
> "CAST(x AS LONG)"
>   }
> {code}
> The final design is still up for discussion. These are just ideas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10101) Mesos web ui url is missing.

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580531#comment-16580531
 ] 

ASF GitHub Bot commented on FLINK-10101:


liurenjie1024 commented on issue #6522: [FLINK-10101][mesos] Add web ui url for 
mesos.
URL: https://github.com/apache/flink/pull/6522#issuecomment-413045382
 
 
   @GJL I've fixed your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos web ui url is missing.
> 
>
> Key: FLINK-10101
> URL: https://issues.apache.org/jira/browse/FLINK-10101
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.5.0, 1.5.1, 1.5.2
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Mesos web ui url is missing in new deploy mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] liurenjie1024 commented on issue #6522: [FLINK-10101][mesos] Add web ui url for mesos.

2018-08-14 Thread GitBox
liurenjie1024 commented on issue #6522: [FLINK-10101][mesos] Add web ui url for 
mesos.
URL: https://github.com/apache/flink/pull/6522#issuecomment-413045382
 
 
   @GJL I've fixed your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9488) Create common entry point for master and workers

2018-08-14 Thread Prithvi Raj (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580468#comment-16580468
 ] 

Prithvi Raj commented on FLINK-9488:


I'm not sure whether this is the right place to ask, but how does one make use 
of this change when running Flink on K8s? Are there docs available somewhere?

> Create common entry point for master and workers
> 
>
> Key: FLINK-9488
> URL: https://issues.apache.org/jira/browse/FLINK-9488
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> To make the container setup easier, we should provide a single cluster entry 
> point which uses leader election to become either the master or a worker 
> which runs the {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580343#comment-16580343
 ] 

ASF GitHub Bot commented on FLINK-10022:


NicoK closed pull request #6551: [FLINK-10022][network][metrics] add metrics 
for input/output buffers
URL: https://github.com/apache/flink/pull/6551
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index bf0aee1ed42..bac40f00066 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1217,6 +1217,27 @@ Thus, in order to infer the metric identifier:
   The number of bytes this task reads from a remote source per 
second.
   Meter
 
+
+  Task
+  numBuffersInLocal
+  The total number of network buffers this task has read from a local 
source.
+  Counter
+
+
+  numBuffersInLocalPerSecond
+  The number of network buffers this task reads from a local source 
per second.
+  Meter
+
+
+  numBuffersInRemote
+  The total number of network buffers this task has read from a remote 
source.
+  Counter
+
+
+  numBuffersInRemotePerSecond
+  The number of network buffers this task reads from a remote source 
per second.
+  Meter
+
 
   numBytesOut
   The total number of bytes this task has emitted.
@@ -1227,6 +1248,16 @@ Thus, in order to infer the metric identifier:
   The number of bytes this task emits per second.
   Meter
 
+
+  numBuffersOut
+  The total number of network buffers this task has emitted.
+  Counter
+
+
+  numBuffersOutPerSecond
+  The number of network buffers this task emits per second.
+  Meter
+
 
   Task/Operator
   numRecordsIn
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index e3a8e49316f..970795c0564 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -71,6 +71,8 @@
 
private Counter numBytesOut = new SimpleCounter();
 
+   private Counter numBuffersOut = new SimpleCounter();
+
public RecordWriter(ResultPartitionWriter writer) {
this(writer, new RoundRobinChannelSelector());
}
@@ -184,6 +186,7 @@ public void clearBuffers() {
  */
public void setMetricGroup(TaskIOMetricGroup metrics) {
numBytesOut = metrics.getNumBytesOutCounter();
+   numBuffersOut = metrics.getNumBuffersOutCounter();
}
 
/**
@@ -200,6 +203,7 @@ private boolean tryFinishCurrentBufferBuilder(int 
targetChannel, RecordSerialize
bufferBuilders[targetChannel] = Optional.empty();
 
numBytesOut.inc(bufferBuilder.finish());
+   numBuffersOut.inc();
serializer.clear();
return true;
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3ce58661281..2a7cedf6949 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -64,6 +64,8 @@
 
protected final Counter numBytesIn;
 
+   protected final Counter numBuffersIn;
+
/** The current backoff (in ms) */
private int currentBackoff;
 
@@ -73,7 +75,8 @@ protected InputChannel(
ResultPartitionID partitionId,
int initialBackoff,
int maxBackoff,
-   Counter numBytesIn) {
+   Counter numBytesIn,
+   Counter numBuffersIn) {
 
checkArgument(channelIndex >= 0);
 
@@ -91,6 +94,7 @@ protected InputChannel(
this.currentBackoff = initial == 0 ? -1 : 0;
 
this.numBytesIn = numBytesIn;
+   this.numBuffersIn = numBuffersIn;
}
 
// 

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java

[GitHub] NicoK closed pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

2018-08-14 Thread GitBox
NicoK closed pull request #6551: [FLINK-10022][network][metrics] add metrics 
for input/output buffers
URL: https://github.com/apache/flink/pull/6551
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index bf0aee1ed42..bac40f00066 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1217,6 +1217,27 @@ Thus, in order to infer the metric identifier:
   The number of bytes this task reads from a remote source per 
second.
   Meter
 
+
+  Task
+  numBuffersInLocal
+  The total number of network buffers this task has read from a local 
source.
+  Counter
+
+
+  numBuffersInLocalPerSecond
+  The number of network buffers this task reads from a local source 
per second.
+  Meter
+
+
+  numBuffersInRemote
+  The total number of network buffers this task has read from a remote 
source.
+  Counter
+
+
+  numBuffersInRemotePerSecond
+  The number of network buffers this task reads from a remote source 
per second.
+  Meter
+
 
   numBytesOut
   The total number of bytes this task has emitted.
@@ -1227,6 +1248,16 @@ Thus, in order to infer the metric identifier:
   The number of bytes this task emits per second.
   Meter
 
+
+  numBuffersOut
+  The total number of network buffers this task has emitted.
+  Counter
+
+
+  numBuffersOutPerSecond
+  The number of network buffers this task emits per second.
+  Meter
+
 
   Task/Operator
   numRecordsIn
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index e3a8e49316f..970795c0564 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -71,6 +71,8 @@
 
private Counter numBytesOut = new SimpleCounter();
 
+   private Counter numBuffersOut = new SimpleCounter();
+
public RecordWriter(ResultPartitionWriter writer) {
this(writer, new RoundRobinChannelSelector());
}
@@ -184,6 +186,7 @@ public void clearBuffers() {
  */
public void setMetricGroup(TaskIOMetricGroup metrics) {
numBytesOut = metrics.getNumBytesOutCounter();
+   numBuffersOut = metrics.getNumBuffersOutCounter();
}
 
/**
@@ -200,6 +203,7 @@ private boolean tryFinishCurrentBufferBuilder(int 
targetChannel, RecordSerialize
bufferBuilders[targetChannel] = Optional.empty();
 
numBytesOut.inc(bufferBuilder.finish());
+   numBuffersOut.inc();
serializer.clear();
return true;
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3ce58661281..2a7cedf6949 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -64,6 +64,8 @@
 
protected final Counter numBytesIn;
 
+   protected final Counter numBuffersIn;
+
/** The current backoff (in ms) */
private int currentBackoff;
 
@@ -73,7 +75,8 @@ protected InputChannel(
ResultPartitionID partitionId,
int initialBackoff,
int maxBackoff,
-   Counter numBytesIn) {
+   Counter numBytesIn,
+   Counter numBuffersIn) {
 
checkArgument(channelIndex >= 0);
 
@@ -91,6 +94,7 @@ protected InputChannel(
this.currentBackoff = initial == 0 ? -1 : 0;
 
this.numBytesIn = numBytesIn;
+   this.numBuffersIn = numBuffersIn;
}
 
// 

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index f9c75addd51..4b3a8ff9773 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 

[jira] [Updated] (FLINK-10042) Extract snapshot algorithms from inner classes into full classes

2018-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10042:
---
Labels: pull-request-available  (was: )

> Extract snapshot algorithms from inner classes into full classes
> 
>
> Key: FLINK-10042
> URL: https://issues.apache.org/jira/browse/FLINK-10042
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10042) Extract snapshot algorithms from inner classes into full classes

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580284#comment-16580284
 ] 

ASF GitHub Bot commented on FLINK-10042:


StefanRRichter commented on issue #6556: FLINK-10042][state] Extract snapshot 
algorithms from inner classes of RocksDBKeyedStateBackend into full classes
URL: https://github.com/apache/flink/pull/6556#issuecomment-412980594
 
 
   CC @azagrebin 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract snapshot algorithms from inner classes into full classes
> 
>
> Key: FLINK-10042
> URL: https://issues.apache.org/jira/browse/FLINK-10042
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10042) Extract snapshot algorithms from inner classes into full classes

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580283#comment-16580283
 ] 

ASF GitHub Bot commented on FLINK-10042:


StefanRRichter opened a new pull request #6556: FLINK-10042][state] Extract 
snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full 
classes
URL: https://github.com/apache/flink/pull/6556
 
 
   ## What is the purpose of the change
   
   This PR is part 2 in the refactoring/decomposition of 
`RocksDBKeyedStateBackend`. All snapshotting algorithms are extracted from the 
backend file into their own full classes.
   In a second step, the PR also introduces `AsyncSnapshotCallable` as a 
general strategy for asynchronous snapshots that outlines resource 
de/allocation, closing, and cancellation.
   
   ## Verifying this change
   
   This change is already covered by existing tests, and I added 
`AsyncSnapshotCallableTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract snapshot algorithms from inner classes into full classes
> 
>
> Key: FLINK-10042
> URL: https://issues.apache.org/jira/browse/FLINK-10042
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on issue #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes

2018-08-14 Thread GitBox
StefanRRichter commented on issue #6556: FLINK-10042][state] Extract snapshot 
algorithms from inner classes of RocksDBKeyedStateBackend into full classes
URL: https://github.com/apache/flink/pull/6556#issuecomment-412980594
 
 
   CC @azagrebin 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter opened a new pull request #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes

2018-08-14 Thread GitBox
StefanRRichter opened a new pull request #6556: FLINK-10042][state] Extract 
snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full 
classes
URL: https://github.com/apache/flink/pull/6556
 
 
   ## What is the purpose of the change
   
   This PR is part 2 in the refactoring/decomposition of 
`RocksDBKeyedStateBackend`. All snapshotting algorithms are extracted from the 
backend file into their own full classes.
   In a second step, the PR also introduces `AsyncSnapshotCallable` as a 
general strategy for asynchronous snapshots that outlines resource 
de/allocation, closing, and cancellation.
   
   ## Verifying this change
   
   This change is already covered by existing tests, and I added 
`AsyncSnapshotCallableTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-08-14 Thread GitBox
walterddr commented on a change in pull request #6508: [Flink-10079] [table] 
Automatically register sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#discussion_r210052280
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -750,7 +751,28 @@ abstract class TableEnvironment(val config: TableConfig) {
 if (null == sinkTableName) throw TableException("Name of TableSink must 
not be null.")
 if (sinkTableName.isEmpty) throw TableException("Name of TableSink must 
not be empty.")
 if (!isRegistered(sinkTableName)) {
-  throw TableException(s"No table was registered under the name 
$sinkTableName.")
+  // try resolving and registering sink table from registered external 
catalogs
+  try {
+val paths = sinkTableName.split("\\.")
+if (paths.length > 1) {
 
 Review comment:
   I am not exactly sure but this seems problematic to me. is there a guarantee 
that the first element of the path will always be `catalog` name? This also 
depends on whether the path requires to be fully-qualify or not, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-08-14 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580197#comment-16580197
 ] 

Rong Rong commented on FLINK-:
--

I do not have access to the newest SQL 2016 standard, but is {{IS_NUMERIC}} in 
it? Seems like multiple DBMS utilizes it in different way. Some takes more than 
just String/VARCHAR type as inputs. 

> Add ISNUMERIC supported in Table API/SQL
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> ISNUMERIC function used to verify a expression is a valid numberic type.
> documentation : 
> https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10079) Automatically register sink table from external catalogs

2018-08-14 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580192#comment-16580192
 ] 

Rong Rong commented on FLINK-10079:
---

I agree we should either make the documentation more consistent, or supporting 
the sink functionalities of the `ExternalCatalog` supports `INSERT INTO`. 
However I think this can be easily achieved using the unified TableSourceSink 
Configuration in FLINK-8866. [~twalthr] do you have any suggestions on this 
ticket?

> Automatically register sink table from external catalogs
> 
>
> Key: FLINK-10079
> URL: https://issues.apache.org/jira/browse/FLINK-10079
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
> Fix For: 1.7.0
>
>
> In the documentation, there is a description:
> {quote}Once registered in a TableEnvironment, all tables defined in a 
> ExternalCatalog can be accessed from Table API or SQL queries by specifying 
> their full path, such as catalog.database.table.
> {quote}
> Currently, this is true only for source tables. For sink table (specified in 
> the Table API or SQL), the users have to explicitly register it even though 
> it is defined in a registered ExternalCatalog, otherwise "No table was 
> registered under the name XXX" TableException would be thrown.
> It would be better keep consistent between source table and sink table, and 
> the users would enjoy more convenient approach to inserting into sink tables. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10145) Add replace supported in TableAPI and SQL

2018-08-14 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-10145:
-

Assignee: Guibo Pan

> Add replace supported in TableAPI and SQL
> -
>
> Key: FLINK-10145
> URL: https://issues.apache.org/jira/browse/FLINK-10145
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Guibo Pan
>Assignee: Guibo Pan
>Priority: Major
>
> replace is an useful function for String. 
> for example:
> {code:java}
> select replace("Hello World", "World", "Flink") // return "Hello Flink"
> select replace("ababab", "abab", "z") // return "zab"
> {code}
> It is supported as a UDF in Hive, more details please see[1]
> [1]: 
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10101) Mesos web ui url is missing.

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580143#comment-16580143
 ] 

ASF GitHub Bot commented on FLINK-10101:


GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web 
ui url for mesos.
URL: https://github.com/apache/flink/pull/6522#discussion_r210040604
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -120,6 +120,10 @@
/** A local actor system for using the helper actors. */
private final ActorSystem actorSystem;
 
+   /** Web url for to show in mesos page.*/
 
 Review comment:
   Nitpicking here but there should be a space between the period and the 
asterisk for consistency:
   ```
/** Web url for to show in mesos page. */
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos web ui url is missing.
> 
>
> Key: FLINK-10101
> URL: https://issues.apache.org/jira/browse/FLINK-10101
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.5.0, 1.5.1, 1.5.2
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Mesos web ui url is missing in new deploy mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos.

2018-08-14 Thread GitBox
GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web 
ui url for mesos.
URL: https://github.com/apache/flink/pull/6522#discussion_r210040604
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -120,6 +120,10 @@
/** A local actor system for using the helper actors. */
private final ActorSystem actorSystem;
 
+   /** Web url for to show in mesos page.*/
 
 Review comment:
   Nitpicking here but there should be a space between the period and the 
asterisk for consistency:
   ```
/** Web url for to show in mesos page. */
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10101) Mesos web ui url is missing.

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580142#comment-16580142
 ] 

ASF GitHub Bot commented on FLINK-10101:


GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web 
ui url for mesos.
URL: https://github.com/apache/flink/pull/6522#discussion_r210036352
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -185,6 +190,8 @@ public MesosResourceManager(
this.workersInNew = new HashMap<>(8);
this.workersInLaunch = new HashMap<>(8);
this.workersBeingReturned = new HashMap<>(8);
+
+   this.webUiUrl = webUiUrl;
 
 Review comment:
   I would move this line below this assignment:
   ```
   this.taskManagerContainerSpec = 
Preconditions.checkNotNull(taskManagerContainerSpec);
   ```
   so that the assignments of all constructor arguments are grouped. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos web ui url is missing.
> 
>
> Key: FLINK-10101
> URL: https://issues.apache.org/jira/browse/FLINK-10101
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.5.0, 1.5.1, 1.5.2
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Mesos web ui url is missing in new deploy mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10101) Mesos web ui url is missing.

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580141#comment-16580141
 ] 

ASF GitHub Bot commented on FLINK-10101:


GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web 
ui url for mesos.
URL: https://github.com/apache/flink/pull/6522#discussion_r210037022
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -237,6 +244,10 @@ protected void initialize() throws 
ResourceManagerException {
Protos.FrameworkInfo.Builder frameworkInfo = 
mesosConfig.frameworkInfo()
.clone()
.setCheckpoint(true);
+   if (webUiUrl != null) {
+   frameworkInfo = frameworkInfo.setWebuiUrl(webUiUrl);
 
 Review comment:
   It is not needed to reassign `frameworkInfo` because the method has 
side-effects on the builder. The return type is only a builder to enable method 
chaining. In fact, `frameworkInfo` can be declared `final`. 
   ```
   if (webUiUrl != null) {
   frameworkInfo.setWebuiUrl(webUiUrl);
   }
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos web ui url is missing.
> 
>
> Key: FLINK-10101
> URL: https://issues.apache.org/jira/browse/FLINK-10101
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.5.0, 1.5.1, 1.5.2
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Mesos web ui url is missing in new deploy mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos.

2018-08-14 Thread GitBox
GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web 
ui url for mesos.
URL: https://github.com/apache/flink/pull/6522#discussion_r210037022
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -237,6 +244,10 @@ protected void initialize() throws 
ResourceManagerException {
Protos.FrameworkInfo.Builder frameworkInfo = 
mesosConfig.frameworkInfo()
.clone()
.setCheckpoint(true);
+   if (webUiUrl != null) {
+   frameworkInfo = frameworkInfo.setWebuiUrl(webUiUrl);
 
 Review comment:
   It is not needed to reassign `frameworkInfo` because the method has 
side-effects on the builder. The return type is only a builder to enable method 
chaining. In fact, `frameworkInfo` can be declared `final`. 
   ```
   if (webUiUrl != null) {
   frameworkInfo.setWebuiUrl(webUiUrl);
   }
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos.

2018-08-14 Thread GitBox
GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web 
ui url for mesos.
URL: https://github.com/apache/flink/pull/6522#discussion_r210036352
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -185,6 +190,8 @@ public MesosResourceManager(
this.workersInNew = new HashMap<>(8);
this.workersInLaunch = new HashMap<>(8);
this.workersBeingReturned = new HashMap<>(8);
+
+   this.webUiUrl = webUiUrl;
 
 Review comment:
   I would move this line below this assignment:
   ```
   this.taskManagerContainerSpec = 
Preconditions.checkNotNull(taskManagerContainerSpec);
   ```
   so that the assignments of all constructor arguments are grouped. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10019) Fix Composite getResultType of UDF cannot be chained with other operators

2018-08-14 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578793#comment-16578793
 ] 

Rong Rong edited comment on FLINK-10019 at 8/14/18 5:39 PM:


Dug a little deeper in Calcite and found that in:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68

Seems like if the return type is inferred as a record, or *{{.isStruct() == 
true}}*, "it must have the same number of fields as the number of operands." 
which clearly is not the case here since the following expression: 
*{{AS(func(a), "myRow")}}* only passes over the *{{func(a)}}* for type 
inference, but not the alias *{{"myRow"}}*


was (Author: walterddr):
Dug a little deeper in Calcite and found that in:
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68

Seems like if the return type is inferred as a record, or {{isStruct()}}, "it 
must have the same number of fields as the number of operands." which clearly 
is not the case here since the following expression: **{{AS(func(a), 
"myRow")}}** only passes over the **{{func(a)}}** for type inference, but not 
the alias **{{"myRow"}}**

> Fix Composite getResultType of UDF cannot be chained with other operators
> -
>
> Key: FLINK-10019
> URL: https://issues.apache.org/jira/browse/FLINK-10019
> Project: Flink
>  Issue Type: Bug
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> If explicitly return a CompositeType in {{udf.getResultType}}, will result in 
> some failures in chained operators.
> For example: consider a simple UDF,
> {code:scala}
> object Func extends ScalarFunction {
>   def eval(row: Row): Row = {
> row
>   }
>   override def getParameterTypes(signature: Array[Class[_]]): 
> Array[TypeInformation[_]] =
> Array(Types.ROW(Types.INT))
>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
> Types.ROW(Types.INT)
> }
> {code}
> This should work perfectly since it's just a simple pass through, however
> {code:scala}
>   @Test
>   def testRowType(): Unit = {
> val data = List(
>   Row.of(Row.of(12.asInstanceOf[Integer]), "1")
> )
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), 
> Types.STRING))
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val table = stream.toTable(tEnv, 'a, 'b)
> tEnv.registerFunction("func", Func)
> tEnv.registerTable("t", table)
> // This works perfectly
> val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row]
> result1.addSink(new StreamITCase.StringSink[Row])
> // This throws exception
> val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM 
> t").toAppendStream[Row]
> result2.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
> {code}
> Exception code:
> {code:java}
> java.lang.IndexOutOfBoundsException: index (1) must be less than size (1)
>   at 
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310)
>   at 
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293)
>   at 
> com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41)
>   at 
> org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349)
> ...
> {code}
> This is due to the fact that Calcite inferOperandTypes does not expect to 
> infer a struct RelDataType.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers.

2018-08-14 Thread GitBox
GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log 
completed containers.
URL: https://github.com/apache/flink/pull/6550#discussion_r210040202
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -323,9 +323,10 @@ public float getProgress() {
}
 
@Override
-   public void onContainersCompleted(final List list) {
+   public void onContainersCompleted(final List statuses) 
{
runAsync(() -> {
-   for (final ContainerStatus containerStatus : 
list) {
+   log.debug("YARN ResourceManager reported the 
following containers completed: {}.", statuses);
 
 Review comment:
   That depends on the `List` implementation. At least 
`java.util.AbstractCollection` has a meaningful string representation. The List 
provided by Hadoop 2.8 as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10137) YARN: Log completed Containers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580127#comment-16580127
 ] 

ASF GitHub Bot commented on FLINK-10137:


GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log 
completed containers.
URL: https://github.com/apache/flink/pull/6550#discussion_r210040202
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -323,9 +323,10 @@ public float getProgress() {
}
 
@Override
-   public void onContainersCompleted(final List list) {
+   public void onContainersCompleted(final List statuses) 
{
runAsync(() -> {
-   for (final ContainerStatus containerStatus : 
list) {
+   log.debug("YARN ResourceManager reported the 
following containers completed: {}.", statuses);
 
 Review comment:
   That depends on the `List` implementation. At least 
`java.util.AbstractCollection` has a meaningful string representation. The List 
provided by Hadoop 2.8 as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YARN: Log completed Containers
> --
>
> Key: FLINK-10137
> URL: https://issues.apache.org/jira/browse/FLINK-10137
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, ResourceManager, YARN
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
>
> Currently the Flink logs do not reveal why a YARN container completed. 
> {{YarnResourceManager}} should log the {{ContainerStatus}} when the YARN 
> ResourceManager reports containers to be completed. 
> *Acceptance Criteria*
> * {{YarnResourceManager#onContainersCompleted(List)}} logs 
> completed containers.
> * {{ResourceManager#closeTaskManagerConnection(ResourceID, Exception)}} 
> should always log the message in the exception.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10137) YARN: Log completed Containers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580126#comment-16580126
 ] 

ASF GitHub Bot commented on FLINK-10137:


GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log 
completed containers.
URL: https://github.com/apache/flink/pull/6550#discussion_r210040202
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -323,9 +323,10 @@ public float getProgress() {
}
 
@Override
-   public void onContainersCompleted(final List list) {
+   public void onContainersCompleted(final List statuses) 
{
runAsync(() -> {
-   for (final ContainerStatus containerStatus : 
list) {
+   log.debug("YARN ResourceManager reported the 
following containers completed: {}.", statuses);
 
 Review comment:
   That depends on the `List` implementation. At least 
`java.util.AbstractCollection` has a meaningful string representation. The List 
returned by Hadoop 2.8 as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YARN: Log completed Containers
> --
>
> Key: FLINK-10137
> URL: https://issues.apache.org/jira/browse/FLINK-10137
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, ResourceManager, YARN
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
>
> Currently the Flink logs do not reveal why a YARN container completed. 
> {{YarnResourceManager}} should log the {{ContainerStatus}} when the YARN 
> ResourceManager reports containers to be completed. 
> *Acceptance Criteria*
> * {{YarnResourceManager#onContainersCompleted(List)}} logs 
> completed containers.
> * {{ResourceManager#closeTaskManagerConnection(ResourceID, Exception)}} 
> should always log the message in the exception.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers.

2018-08-14 Thread GitBox
GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log 
completed containers.
URL: https://github.com/apache/flink/pull/6550#discussion_r210040202
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -323,9 +323,10 @@ public float getProgress() {
}
 
@Override
-   public void onContainersCompleted(final List list) {
+   public void onContainersCompleted(final List statuses) 
{
runAsync(() -> {
-   for (final ContainerStatus containerStatus : 
list) {
+   log.debug("YARN ResourceManager reported the 
following containers completed: {}.", statuses);
 
 Review comment:
   That depends on the `List` implementation. At least 
`java.util.AbstractCollection` has a meaningful string representation. The List 
returned by Hadoop 2.8 as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10145) Add replace supported in TableAPI and SQL

2018-08-14 Thread Guibo Pan (JIRA)
Guibo Pan created FLINK-10145:
-

 Summary: Add replace supported in TableAPI and SQL
 Key: FLINK-10145
 URL: https://issues.apache.org/jira/browse/FLINK-10145
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL
Reporter: Guibo Pan


replace is an useful function for String. 

for example:
{code:java}
select replace("Hello World", "World", "Flink") // return "Hello Flink"
select replace("ababab", "abab", "z") // return "zab"
{code}
It is supported as a UDF in Hive, more details please see[1]

[1]: 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9101) HAQueryableStateRocksDBBackendITCase failed on travis

2018-08-14 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9101:
-
Fix Version/s: (was: 1.6.0)
   1.7.0
   1.6.1

> HAQueryableStateRocksDBBackendITCase failed on travis
> -
>
> Key: FLINK-9101
> URL: https://issues.apache.org/jira/browse/FLINK-9101
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.1, 1.7.0
>
>
> The test deadlocks on travis.
> https://travis-ci.org/zentol/flink/jobs/358894950



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9101) HAQueryableStateRocksDBBackendITCase failed on travis

2018-08-14 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-9101:
--

It failed again: https://api.travis-ci.org/v3/job/415961618/log.txt

> HAQueryableStateRocksDBBackendITCase failed on travis
> -
>
> Key: FLINK-9101
> URL: https://issues.apache.org/jira/browse/FLINK-9101
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.1, 1.7.0
>
>
> The test deadlocks on travis.
> https://travis-ci.org/zentol/flink/jobs/358894950



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10137) YARN: Log completed Containers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580017#comment-16580017
 ] 

ASF GitHub Bot commented on FLINK-10137:


yanghua commented on a change in pull request #6550: [FLINK-10137][YARN] Log 
completed containers.
URL: https://github.com/apache/flink/pull/6550#discussion_r210015723
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -323,9 +323,10 @@ public float getProgress() {
}
 
@Override
-   public void onContainersCompleted(final List list) {
+   public void onContainersCompleted(final List statuses) 
{
runAsync(() -> {
-   for (final ContainerStatus containerStatus : 
list) {
+   log.debug("YARN ResourceManager reported the 
following containers completed: {}.", statuses);
 
 Review comment:
   here will call the list's toString method? Is the format suitable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YARN: Log completed Containers
> --
>
> Key: FLINK-10137
> URL: https://issues.apache.org/jira/browse/FLINK-10137
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, ResourceManager, YARN
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
>
> Currently the Flink logs do not reveal why a YARN container completed. 
> {{YarnResourceManager}} should log the {{ContainerStatus}} when the YARN 
> ResourceManager reports containers to be completed. 
> *Acceptance Criteria*
> * {{YarnResourceManager#onContainersCompleted(List)}} logs 
> completed containers.
> * {{ResourceManager#closeTaskManagerConnection(ResourceID, Exception)}} 
> should always log the message in the exception.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers.

2018-08-14 Thread GitBox
yanghua commented on a change in pull request #6550: [FLINK-10137][YARN] Log 
completed containers.
URL: https://github.com/apache/flink/pull/6550#discussion_r210015723
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -323,9 +323,10 @@ public float getProgress() {
}
 
@Override
-   public void onContainersCompleted(final List list) {
+   public void onContainersCompleted(final List statuses) 
{
runAsync(() -> {
-   for (final ContainerStatus containerStatus : 
list) {
+   log.debug("YARN ResourceManager reported the 
following containers completed: {}.", statuses);
 
 Review comment:
   here will call the list's toString method? Is the format suitable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580010#comment-16580010
 ] 

ASF GitHub Bot commented on FLINK-10144:


yanghua commented on issue #6554: [FLINK-10144]delete unused import 
akka.actor.ActorRef in TaskManagerMessages.scala
URL: https://github.com/apache/flink/pull/6554#issuecomment-412928276
 
 
   the change is OK, but is it worth to open a PR to do this little fix? cc 
@zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> No unused import "import akka.actor.ActorRef" in class 
> "org.apache.flink.runtime.messages.TaskManagerMessages"
> --
>
> Key: FLINK-10144
> URL: https://issues.apache.org/jira/browse/FLINK-10144
> Project: Flink
>  Issue Type: Improvement
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
>
>  
> {code:java}
> package org.apache.flink.runtime.messages
> import java.util.UUID
> import akka.actor.ActorRef
> import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
> import org.apache.flink.runtime.instance.InstanceID
> /**
>  * Miscellaneous actor messages exchanged with the TaskManager.
>  */
> object TaskManagerMessages {
> /./
> }
> {code}
>  
> The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we 
> can delete it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6554: [FLINK-10144]delete unused import akka.actor.ActorRef in TaskManagerMessages.scala

2018-08-14 Thread GitBox
yanghua commented on issue #6554: [FLINK-10144]delete unused import 
akka.actor.ActorRef in TaskManagerMessages.scala
URL: https://github.com/apache/flink/pull/6554#issuecomment-412928276
 
 
   the change is OK, but is it worth to open a PR to do this little fix? cc 
@zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580001#comment-16580001
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412926404
 
 
   cc @pnowojski updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-14 Thread GitBox
yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412926404
 
 
   cc @pnowojski updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-08-14 Thread Jonathan Miles (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579950#comment-16579950
 ] 

Jonathan Miles commented on FLINK-9061:
---

Is the PR still being considered for merging? We seem to be running into the 
throttling issue with S3 and Flink 1.4.2 and a large stateful job.

I'll come back with more information. I only just started the investigation and 
came across this ticket.

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-08-14 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-8577:
---
Description: 
Api will looks like:
{code:java}
DataStream[(String, Long, Int)] input = ???
// upsert with keyTable 
table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// upsert without key -> single row tableTable 
table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}

A simple design 
[doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
 about this subtask.

  was:
Api will looks like:
{code:java}
DataStream[(String, Long, Int)] input = ???
// upsert with keyTable 
table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// upsert without key -> single row tableTable 
table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-08-14 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579899#comment-16579899
 ] 

Hequn Cheng edited comment on FLINK-8545 at 8/14/18 3:20 PM:
-

[~pnowojski]
Great to have your response. The {{UpsertStreamTable}} and 
{{AppendStreamTable}} are internal classes which are used during 
{{registerTableInternal}}. 

Flag is a good choice, but it is difficult to handle the DataStream type. When 
upsert from stream, the input type is always tuple type(see FLINK-8577). To 
solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream 
with type of tuple2. 

Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} 
and it may be translated into an {{UpsertWithRetractNode}} by the current 
RetractionRules. 

Thanks again for your suggestions. I have posted a simple design doc just now 
in [FLINK-8577|https://issues.apache.org/jira/browse/FLINK-8577]. 


was (Author: hequn8128):
[~pnowojski]
Great to have your response. The {{UpsertStreamTable}} and 
{{AppendStreamTable}} are internal classes which are used during 
{{registerTableInternal}}. 

Flag is a good choice, but it is difficult to handle the DataStream type. When 
upsert from stream, the input type is always tuple type(see FLINK-8577). To 
solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream 
with type of tuple2. 

Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} 
and it may be translated into an {{UpsertWithRetractNode}} by the current 
RetractionRules. 

Thanks again for your suggestions. I will post a design doc ASAP. 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-08-14 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579945#comment-16579945
 ] 

Hequn Cheng commented on FLINK-8577:


A simple design 
[doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
 about this subtask. Any suggestions are welcomed!

> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579907#comment-16579907
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-412900643
 
 
   One possible bad news. I have run benchmarks as defined in 
https://github.com/dataArtisans/flink-benchmarks on this branch and quite a lot 
of them have shown performance regression. The worst was 
`StreamNetworkThroughputBenchmarkExecutor` with `1,100ms` params - regression 
~18%.
   
   Could you run those benchmarks locally and confirm if that's the case or not?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-14 Thread GitBox
pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-412900643
 
 
   One possible bad news. I have run benchmarks as defined in 
https://github.com/dataArtisans/flink-benchmarks on this branch and quite a lot 
of them have shown performance regression. The worst was 
`StreamNetworkThroughputBenchmarkExecutor` with `1,100ms` params - regression 
~18%.
   
   Could you run those benchmarks locally and confirm if that's the case or not?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8545) Implement upsert stream table source

2018-08-14 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579899#comment-16579899
 ] 

Hequn Cheng commented on FLINK-8545:


[~pnowojski]
Great to have your response. The {{UpsertStreamTable}} and 
{{AppendStreamTable}} are internal classes which are used during 
{{registerTableInternal}}. 

Flag is a good choice, but it is difficult to handle the DataStream type. When 
upsert from stream, the input type is always tuple type(see FLINK-8577). To 
solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream 
with type of tuple2. 

Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} 
and it may be translated into an {{UpsertWithRetractNode}} by the current 
RetractionRules. 

Thanks again for your suggestions. I will post a design doc ASAP. 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9831) Too many open files for RocksDB

2018-08-14 Thread Sayat Satybaldiyev (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579888#comment-16579888
 ] 

Sayat Satybaldiyev commented on FLINK-9831:
---

I've setup PredefinedOptions.SPINNING_DISK_OPTIMIZED for flink RocksDBBacked 
and I didn't get this exception then. Thought, quite interesting that default 
RocksDB options don't use OS limit settings.

> Too many open files for RocksDB
> ---
>
> Key: FLINK-9831
> URL: https://issues.apache.org/jira/browse/FLINK-9831
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sayat Satybaldiyev
>Priority: Major
> Attachments: flink_open_files.txt
>
>
> While running only one Flink job, which is backed by RocksDB with 
> checkpoining to HDFS we encounter an exception that TM cannot access the SST 
> file because the process has too many open files. However, we have already 
> increased the file soft/hard limit on the machine.
> Number open files for TM on the machine:
>  
> {code:java}
> lsof -p 23301|wc -l
> 8241{code}
>  
> Instance limits
>  
> {code:java}
> ulimit -a
> core file size (blocks, -c) 0
> data seg size (kbytes, -d) unlimited
> scheduling priority (-e) 0
> file size (blocks, -f) unlimited
> pending signals (-i) 256726
> max locked memory (kbytes, -l) 64
> max memory size (kbytes, -m) unlimited
> open files (-n) 1048576
> pipe size (512 bytes, -p) 8
> POSIX message queues (bytes, -q) 819200
> real-time priority (-r) 0
> stack size (kbytes, -s) 8192
> cpu time (seconds, -t) unlimited
> max user processes (-u) 128000
> virtual memory (kbytes, -v) unlimited
> file locks (-x) unlimited
>  
> {code}
>  
> [^flink_open_files.txt]
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1_(1/1) from any of the 
> 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
>   ... 5 more
> Caused by: java.io.FileNotFoundException: 
> /tmp/flink-io-3da06c9e-f619-44c9-b95f-54ee9b1a084a/job_b3ecbdc0eb2dc2dfbf5532ec1fcef9da_op_KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1__1_1__uuid_c4b82a7e-8a04-4704-9e0b-393c3243cef2/3701639a-bacd-4861-99d8-5f3d112e88d6/16.sst
>  (Too many open files)
>   at java.io.FileOutputStream.open0(Native Method)
>   at java.io.FileOutputStream.open(FileOutputStream.java:270)
>   at java.io.FileOutputStream.(FileOutputStream.java:213)
>   at java.io.FileOutputStream.(FileOutputStream.java:162)
>   at 
> org.apache.flink.core.fs.local.LocalDataOutputStream.(LocalDataOutputStream.java:47)
>   at 
> org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:275)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1008)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:973)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
>   at 
> 

[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579867#comment-16579867
 ] 

ASF GitHub Bot commented on FLINK-9899:
---

glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connector] Add 
comprehensive per-shard metrics to ShardConsumer
URL: https://github.com/apache/flink/pull/6409#issuecomment-412888671
 
 
   @zentol @tzulitai Can you take a look at this and merge if it looks good?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connector] Add comprehensive per-shard metrics to ShardConsumer

2018-08-14 Thread GitBox
glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connector] Add 
comprehensive per-shard metrics to ShardConsumer
URL: https://github.com/apache/flink/pull/6409#issuecomment-412888671
 
 
   @zentol @tzulitai Can you take a look at this and merge if it looks good?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579855#comment-16579855
 ] 

ASF GitHub Bot commented on FLINK-10022:


zentol commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209965405
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
 ##
 @@ -82,6 +82,7 @@ public IOMetricsInfo(
this.bytesReadComplete = bytesReadComplete;
this.bytesWritten = bytesWritten;
this.bytesWrittenComplete = bytesWrittenComplete;
+
 
 Review comment:
   revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add metrics for input/output buffers
> 
>
> Key: FLINK-10022
> URL: https://issues.apache.org/jira/browse/FLINK-10022
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we provide metrics for the input and output queue lengths, records 
> and bytes. For diagnosing network issues, it would also be nice to know the 
> number of buffers because we would then know their fill rate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579857#comment-16579857
 ] 

ASF GitHub Bot commented on FLINK-10022:


zentol commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209965354
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
 ##
 @@ -179,11 +179,12 @@ public void writeIOMetricsAsJson(JsonGenerator gen) 
throws IOException {
 
gen.writeObjectFieldStart("metrics");
 
-   Long numBytesIn = this.numBytesInLocal + this.numBytesInRemote;
 
 Review comment:
   revert changes in this file


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add metrics for input/output buffers
> 
>
> Key: FLINK-10022
> URL: https://issues.apache.org/jira/browse/FLINK-10022
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we provide metrics for the input and output queue lengths, records 
> and bytes. For diagnosing network issues, it would also be nice to know the 
> number of buffers because we would then know their fill rate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579856#comment-16579856
 ] 

ASF GitHub Bot commented on FLINK-10022:


zentol commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209965189
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 ##
 @@ -48,6 +48,7 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter 
bytesLocalIn, Meter by
this.numRecordsInPerSecond = recordsIn.getRate();
this.numRecordsOut = recordsOut.getCount();
this.numRecordsOutPerSecond = recordsOut.getRate();
+
 
 Review comment:
   revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add metrics for input/output buffers
> 
>
> Key: FLINK-10022
> URL: https://issues.apache.org/jira/browse/FLINK-10022
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we provide metrics for the input and output queue lengths, records 
> and bytes. For diagnosing network issues, it would also be nice to know the 
> number of buffers because we would then know their fill rate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

2018-08-14 Thread GitBox
zentol commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209965354
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java
 ##
 @@ -179,11 +179,12 @@ public void writeIOMetricsAsJson(JsonGenerator gen) 
throws IOException {
 
gen.writeObjectFieldStart("metrics");
 
-   Long numBytesIn = this.numBytesInLocal + this.numBytesInRemote;
 
 Review comment:
   revert changes in this file


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

2018-08-14 Thread GitBox
zentol commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209965189
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 ##
 @@ -48,6 +48,7 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter 
bytesLocalIn, Meter by
this.numRecordsInPerSecond = recordsIn.getRate();
this.numRecordsOut = recordsOut.getCount();
this.numRecordsOutPerSecond = recordsOut.getRate();
+
 
 Review comment:
   revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

2018-08-14 Thread GitBox
zentol commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209965405
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
 ##
 @@ -82,6 +82,7 @@ public IOMetricsInfo(
this.bytesReadComplete = bytesReadComplete;
this.bytesWritten = bytesWritten;
this.bytesWrittenComplete = bytesWrittenComplete;
+
 
 Review comment:
   revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"

2018-08-14 Thread chengjie.wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579840#comment-16579840
 ] 

chengjie.wu commented on FLINK-10144:
-

[~Zentol],[~lsy] I have opened a new pull request for this issue. 
[https://github.com/apache/flink/pull/6554], thanks.

 

> No unused import "import akka.actor.ActorRef" in class 
> "org.apache.flink.runtime.messages.TaskManagerMessages"
> --
>
> Key: FLINK-10144
> URL: https://issues.apache.org/jira/browse/FLINK-10144
> Project: Flink
>  Issue Type: Improvement
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
>
>  
> {code:java}
> package org.apache.flink.runtime.messages
> import java.util.UUID
> import akka.actor.ActorRef
> import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
> import org.apache.flink.runtime.instance.InstanceID
> /**
>  * Miscellaneous actor messages exchanged with the TaskManager.
>  */
> object TaskManagerMessages {
> /./
> }
> {code}
>  
> The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we 
> can delete it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579836#comment-16579836
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-412878869
 
 
   @bowenli86 I agree that test case should be more robust, and I have updated 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Priority: Minor
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-14 Thread GitBox
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-412878869
 
 
   @bowenli86 I agree that test case should be more robust, and I have updated 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579833#comment-16579833
 ] 

ASF GitHub Bot commented on FLINK-10142:


NicoK opened a new pull request #6555: [FLINK-10142][network] reduce locking 
around credit notification
URL: https://github.com/apache/flink/pull/6555
 
 
   ## What is the purpose of the change
   
   When credit-based flow control was introduced, we also added some checks and 
optimisations for uncommon code paths that make common code paths unnecessarily 
more expensive, e.g. checking whether a channel was released before forwarding 
a credit notification to Netty. Such checks would have to be confirmed by the 
Netty thread anyway and thus only add additional load for something that 
happens only once (per channel). This PR removes these additional checks.
   
   Please note that this PR builds upon #6553.
   
   ## Brief change log
   
   - from `PartitionRequestClient`: remove checking whether we have already 
been disposed
   - from `RemoteInputChannel#notifyCreditAvailable()`: remove check for 
`isReleased` (checked when the notification arrives at Netty's thread)
   - from `RemoteInputChannel#notifyBufferAvailable()`: we must check inside 
`synchronized (bufferQueue)` anyway and there is no race condition with 
`releaseAllResources()` as previously mentioned there (the previous check also 
did not really prevent any race if it existed!)
   
   ## Verifying this change
   
   - covered by existing tests, such as `RemoteInputChannelTest` and in general 
everything using the network stack
   - added 
`RemoteInputChannelTest#testConcurrentNotifyBufferAvailableAndRelease`
   - check for reduced CPU load (together with #6553)
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no** (per 
buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10142) Reduce synchronization overhead for credit notifications

2018-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10142:
---
Labels: pull-request-available  (was: )

> Reduce synchronization overhead for credit notifications
> 
>
> Key: FLINK-10142
> URL: https://issues.apache.org/jira/browse/FLINK-10142
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> When credit-based flow control was introduced, we also added some checks and 
> optimisations for uncommon code paths that make common code paths 
> unnecessarily more expensive, e.g. checking whether a channel was released 
> before forwarding a credit notification to Netty. Such checks would have to 
> be confirmed by the Netty thread anyway and thus only add additional load for 
> something that happens only once (per channel).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK opened a new pull request #6555: [FLINK-10142][network] reduce locking around credit notification

2018-08-14 Thread GitBox
NicoK opened a new pull request #6555: [FLINK-10142][network] reduce locking 
around credit notification
URL: https://github.com/apache/flink/pull/6555
 
 
   ## What is the purpose of the change
   
   When credit-based flow control was introduced, we also added some checks and 
optimisations for uncommon code paths that make common code paths unnecessarily 
more expensive, e.g. checking whether a channel was released before forwarding 
a credit notification to Netty. Such checks would have to be confirmed by the 
Netty thread anyway and thus only add additional load for something that 
happens only once (per channel). This PR removes these additional checks.
   
   Please note that this PR builds upon #6553.
   
   ## Brief change log
   
   - from `PartitionRequestClient`: remove checking whether we have already 
been disposed
   - from `RemoteInputChannel#notifyCreditAvailable()`: remove check for 
`isReleased` (checked when the notification arrives at Netty's thread)
   - from `RemoteInputChannel#notifyBufferAvailable()`: we must check inside 
`synchronized (bufferQueue)` anyway and there is no race condition with 
`releaseAllResources()` as previously mentioned there (the previous check also 
did not really prevent any race if it existed!)
   
   ## Verifying this change
   
   - covered by existing tests, such as `RemoteInputChannelTest` and in general 
everything using the network stack
   - added 
`RemoteInputChannelTest#testConcurrentNotifyBufferAvailableAndRelease`
   - check for reduced CPU load (together with #6553)
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no** (per 
buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"

2018-08-14 Thread dalongliu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579809#comment-16579809
 ] 

dalongliu commented on FLINK-10144:
---

[~Zentol] can you assign this issue to [~wucj],he want to contribute to 
Flink,let's give him a opportunity,thanks very much!

> No unused import "import akka.actor.ActorRef" in class 
> "org.apache.flink.runtime.messages.TaskManagerMessages"
> --
>
> Key: FLINK-10144
> URL: https://issues.apache.org/jira/browse/FLINK-10144
> Project: Flink
>  Issue Type: Improvement
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
>
>  
> {code:java}
> package org.apache.flink.runtime.messages
> import java.util.UUID
> import akka.actor.ActorRef
> import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
> import org.apache.flink.runtime.instance.InstanceID
> /**
>  * Miscellaneous actor messages exchanged with the TaskManager.
>  */
> object TaskManagerMessages {
> /./
> }
> {code}
>  
> The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we 
> can delete it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579794#comment-16579794
 ] 

ASF GitHub Bot commented on FLINK-10144:


wchengjie opened a new pull request #6554: [FLINK-10144]delete unused import 
akka.actor.ActorRef in TaskManagerMessages.scala
URL: https://github.com/apache/flink/pull/6554
 
 
   ## What is the purpose of the change
   
   *This pull request aim to delete an useless import `import 
akka.actor.ActorRef` in 
org.apache.flink.runtime.messages.TaskManagerMessages.scala*
   
   
   ## Brief change log
   
   *delete unused `import akka.actor.ActorRef` in 
`org.apache.flink.runtime.messages.TaskManagerMessages.scala`*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*org.apache.flink.test.recovery.TaskManagerFailureRecoveryITCase.testRestartWithFailingTaskManager()*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> No unused import "import akka.actor.ActorRef" in class 
> "org.apache.flink.runtime.messages.TaskManagerMessages"
> --
>
> Key: FLINK-10144
> URL: https://issues.apache.org/jira/browse/FLINK-10144
> Project: Flink
>  Issue Type: Improvement
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
>
>  
> {code:java}
> package org.apache.flink.runtime.messages
> import java.util.UUID
> import akka.actor.ActorRef
> import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
> import org.apache.flink.runtime.instance.InstanceID
> /**
>  * Miscellaneous actor messages exchanged with the TaskManager.
>  */
> object TaskManagerMessages {
> /./
> }
> {code}
>  
> The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we 
> can delete it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"

2018-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10144:
---
Labels: pull-request-available  (was: )

> No unused import "import akka.actor.ActorRef" in class 
> "org.apache.flink.runtime.messages.TaskManagerMessages"
> --
>
> Key: FLINK-10144
> URL: https://issues.apache.org/jira/browse/FLINK-10144
> Project: Flink
>  Issue Type: Improvement
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
>
>  
> {code:java}
> package org.apache.flink.runtime.messages
> import java.util.UUID
> import akka.actor.ActorRef
> import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
> import org.apache.flink.runtime.instance.InstanceID
> /**
>  * Miscellaneous actor messages exchanged with the TaskManager.
>  */
> object TaskManagerMessages {
> /./
> }
> {code}
>  
> The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we 
> can delete it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wchengjie opened a new pull request #6554: [FLINK-10144]delete unused import akka.actor.ActorRef in TaskManagerMessages.scala

2018-08-14 Thread GitBox
wchengjie opened a new pull request #6554: [FLINK-10144]delete unused import 
akka.actor.ActorRef in TaskManagerMessages.scala
URL: https://github.com/apache/flink/pull/6554
 
 
   ## What is the purpose of the change
   
   *This pull request aim to delete an useless import `import 
akka.actor.ActorRef` in 
org.apache.flink.runtime.messages.TaskManagerMessages.scala*
   
   
   ## Brief change log
   
   *delete unused `import akka.actor.ActorRef` in 
`org.apache.flink.runtime.messages.TaskManagerMessages.scala`*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*org.apache.flink.test.recovery.TaskManagerFailureRecoveryITCase.testRestartWithFailingTaskManager()*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579783#comment-16579783
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski edited a comment on issue #6367: [FLINK-9850] Add a string to the 
print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964
 
 
   Yes exactly. More precisely as I wrote in one of the previous comments, this 
`TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from 
those methods:
   
   - open
   - writeRecord/invoke
   - toString
   - close (those probably should be dropped because of NPE.)
   
   Because their logic is identical between `PrintSinkFunction` and 
`PrintingOutputFormat` while their implementation is already inconsistent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579782#comment-16579782
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964
 
 
   Yes exactly. More precisely as I wrote in one of the previous comments, this 
`TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from 
those methods:
   
   - open
   - writeRecord/invoke
   - toString
   - close (those probably should be dropped because of NPE.)
   
   Because their logic is identical between `PrintSinkFunction` and 
`PrintingOutputFormat` while their implementation is already inconsistent. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski edited a comment on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-14 Thread GitBox
pnowojski edited a comment on issue #6367: [FLINK-9850] Add a string to the 
print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964
 
 
   Yes exactly. More precisely as I wrote in one of the previous comments, this 
`TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from 
those methods:
   
   - open
   - writeRecord/invoke
   - toString
   - close (those probably should be dropped because of NPE.)
   
   Because their logic is identical between `PrintSinkFunction` and 
`PrintingOutputFormat` while their implementation is already inconsistent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-14 Thread GitBox
pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964
 
 
   Yes exactly. More precisely as I wrote in one of the previous comments, this 
`TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from 
those methods:
   
   - open
   - writeRecord/invoke
   - toString
   - close (those probably should be dropped because of NPE.)
   
   Because their logic is identical between `PrintSinkFunction` and 
`PrintingOutputFormat` while their implementation is already inconsistent. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message

2018-08-14 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579779#comment-16579779
 ] 

buptljy commented on FLINK-10119:
-

[~twalthr] Okay, I am going to add two optional properties.
1. ignore-when-error: ignore the error line if this is configured to be true.
2. additinal-error-field: add error messages to the additional row and the 
others are null.

> JsonRowDeserializationSchema deserialize kafka message
> --
>
> Key: FLINK-10119
> URL: https://issues.apache.org/jira/browse/FLINK-10119
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.1
> Environment: 无
>Reporter: sean.miao
>Assignee: buptljy
>Priority: Major
>
> Recently, we are using Kafka010JsonTableSource to process kafka's json 
> messages.We turned on checkpoint and auto-restart strategy .
> We found that as long as the format of a message is not json, it will cause 
> the job to not be pulled up. Of course, this is to ensure that only once 
> processing or at least once processing, but the resulting application is not 
> available and has a greater impact on us.
> the code is :
> class : JsonRowDeserializationSchema
> function :
> @Override
>  public Row deserialize(byte[] message) throws IOException {
>  try
> { final JsonNode root = objectMapper.readTree(message); return 
> convertRow(root, (RowTypeInfo) typeInfo); }
> catch (Throwable t)
> { throw new IOException("Failed to deserialize JSON object.", t); }
> }
> now ,i change it to  :
> public Row deserialize(byte[] message) throws IOException {
>  try
> { JsonNode root = this.objectMapper.readTree(message); return 
> this.convertRow(root, (RowTypeInfo)this.typeInfo); }
> catch (Throwable var4) {
>  message = this.objectMapper.writeValueAsBytes("{}");
>  JsonNode root = this.objectMapper.readTree(message);
>  return this.convertRow(root, (RowTypeInfo)this.typeInfo);
>  }
>  }
>  
> I think that data format errors are inevitable during network transmission, 
> so can we add a new column to the table for the wrong data format? like spark 
> sql does。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579760#comment-16579760
 ] 

ASF GitHub Bot commented on FLINK-10022:


NicoK commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209937737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 ##
 @@ -43,38 +43,74 @@
protected double numBytesInRemotePerSecond;
protected double numBytesOutPerSecond;
 
-   public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, 
Meter bytesRemoteIn, Meter bytesOut) {
+   protected long numBuffersInLocal;
+   protected long numBuffersInRemote;
+   protected long numBuffersOut;
+
+   protected double numBuffersInLocalPerSecond;
+   protected double numBuffersInRemotePerSecond;
+   protected double numBuffersOutPerSecond;
+
+   public IOMetrics(
 
 Review comment:
   that's what I meant - done...hopefully ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add metrics for input/output buffers
> 
>
> Key: FLINK-10022
> URL: https://issues.apache.org/jira/browse/FLINK-10022
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we provide metrics for the input and output queue lengths, records 
> and bytes. For diagnosing network issues, it would also be nice to know the 
> number of buffers because we would then know their fill rate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

2018-08-14 Thread GitBox
NicoK commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209937737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 ##
 @@ -43,38 +43,74 @@
protected double numBytesInRemotePerSecond;
protected double numBytesOutPerSecond;
 
-   public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, 
Meter bytesRemoteIn, Meter bytesOut) {
+   protected long numBuffersInLocal;
+   protected long numBuffersInRemote;
+   protected long numBuffersOut;
+
+   protected double numBuffersInLocalPerSecond;
+   protected double numBuffersInRemotePerSecond;
+   protected double numBuffersOutPerSecond;
+
+   public IOMetrics(
 
 Review comment:
   that's what I meant - done...hopefully ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579758#comment-16579758
 ] 

ASF GitHub Bot commented on FLINK-10022:


NicoK commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209937737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 ##
 @@ -43,38 +43,74 @@
protected double numBytesInRemotePerSecond;
protected double numBytesOutPerSecond;
 
-   public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, 
Meter bytesRemoteIn, Meter bytesOut) {
+   protected long numBuffersInLocal;
+   protected long numBuffersInRemote;
+   protected long numBuffersOut;
+
+   protected double numBuffersInLocalPerSecond;
+   protected double numBuffersInRemotePerSecond;
+   protected double numBuffersOutPerSecond;
+
+   public IOMetrics(
 
 Review comment:
   that's what I meant - done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add metrics for input/output buffers
> 
>
> Key: FLINK-10022
> URL: https://issues.apache.org/jira/browse/FLINK-10022
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we provide metrics for the input and output queue lengths, records 
> and bytes. For diagnosing network issues, it would also be nice to know the 
> number of buffers because we would then know their fill rate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

2018-08-14 Thread GitBox
NicoK commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209937737
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 ##
 @@ -43,38 +43,74 @@
protected double numBytesInRemotePerSecond;
protected double numBytesOutPerSecond;
 
-   public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, 
Meter bytesRemoteIn, Meter bytesOut) {
+   protected long numBuffersInLocal;
+   protected long numBuffersInRemote;
+   protected long numBuffersOut;
+
+   protected double numBuffersInLocalPerSecond;
+   protected double numBuffersInRemotePerSecond;
+   protected double numBuffersOutPerSecond;
+
+   public IOMetrics(
 
 Review comment:
   that's what I meant - done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"

2018-08-14 Thread dalongliu (JIRA)
dalongliu created FLINK-10144:
-

 Summary: No unused import "import akka.actor.ActorRef" in class 
"org.apache.flink.runtime.messages.TaskManagerMessages"
 Key: FLINK-10144
 URL: https://issues.apache.org/jira/browse/FLINK-10144
 Project: Flink
  Issue Type: Improvement
Reporter: dalongliu
Assignee: dalongliu


 
{code:java}
package org.apache.flink.runtime.messages

import java.util.UUID

import akka.actor.ActorRef
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.instance.InstanceID

/**
 * Miscellaneous actor messages exchanged with the TaskManager.
 */
object TaskManagerMessages {
/./
}
{code}
 

The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we can 
delete it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579743#comment-16579743
 ] 

ASF GitHub Bot commented on FLINK-8290:
---

azagrebin commented on issue #5304: [FLINK-8290]Modify clientId to groupId in 
flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#issuecomment-412856931
 
 
   LGTM  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Modify clientId to groupId in flink-connector-kafka-0.8
> ---
>
> Key: FLINK-8290
> URL: https://issues.apache.org/jira/browse/FLINK-8290
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Assignee: xymaqingxiang
>Priority: Major
>  Labels: pull-request-available
>
> Now the Clientid that consumes the all topics are 
> constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy 
> for us to look at kafka's log, so I recommend that it be modified to groupid.
> We can modify the SimpleConsumerThread.java file, as shown below:
> {code:java}
> private final String clientId;
> ...
> this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" 
> + broker.id());
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on issue #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-14 Thread GitBox
azagrebin commented on issue #5304: [FLINK-8290]Modify clientId to groupId in 
flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#issuecomment-412856931
 
 
   LGTM  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10141) Reduce lock contention introduced with 1.5

2018-08-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10141:
---
Labels: pull-request-available  (was: )

> Reduce lock contention introduced with 1.5
> --
>
> Key: FLINK-10141
> URL: https://issues.apache.org/jira/browse/FLINK-10141
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the changes around introducing credit-based flow control as well as the 
> low latency changes, unfortunately, we also introduced some lock contention 
> on {{RemoteInputChannel#bufferQueue}} and 
> {{RemoteInputChannel#receivedBuffers}} as well as asking for queue sizes when 
> the only thing we need is whether it is empty or not.
> This was observed as a high idle CPU load with no events in the stream but 
> only watermarks (every 500ms) and many slots on a single machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10141) Reduce lock contention introduced with 1.5

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579739#comment-16579739
 ] 

ASF GitHub Bot commented on FLINK-10141:


NicoK opened a new pull request #6553: [FLINK-10141][network] optimisations 
reducing lock contention
URL: https://github.com/apache/flink/pull/6553
 
 
   ## What is the purpose of the change
   
   With the changes around introducing credit-based flow control as well as the 
low latency changes, unfortunately, we also introduced some lock contention on 
`RemoteInputChannel#bufferQueue` and `RemoteInputChannel#receivedBuffers`. 
Additionally, we were asking for queue sizes when the only thing we need is 
whether it is empty or not.
   
   As a result, we saw high CPU load during "idle" stream processing jobs with 
no events in the stream but only watermarks (every 500ms) and many slots on a 
single machine.
   
   ## Brief change log
   
   - move `notifyCreditAvailable()` out of the lock around 
`RemoteInputChannel#bufferQueue`
   - move `notifyChannelNonEmpty()` out of the lock around 
`RemoteInputChannel#receivedBuffers`
   - replace `RemoteInputChannel#receivedBuffers.size()` with 
`receivedBuffers.isEmpty()` when this is the only thing needed
   - replace `SingleInputGate#inputChannelsWithData.size()` with 
`inputChannelsWithData.isEmpty()` when this is the only thing needed
   - minor code style improvement in `CreditBasedPartitionRequestClientHandler` 
to improve readability
   
   ## Verifying this change
   
   - This change is already covered by existing tests, such as 
`RemoteInputChannelTest`, `SingleInputGateTest`,..., and everything using the 
network stack.
   - Manually verified that less CPU is used with a `SocketWindowWordCount` 
with some added shuffles, no input elements, only watermarks every 500ms with 4 
TMs, 10 slots each on a single (laptop) machine.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no** (per 
buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **JavaDocs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce lock contention introduced with 1.5
> --
>
> Key: FLINK-10141
> URL: https://issues.apache.org/jira/browse/FLINK-10141
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> With the changes around introducing credit-based flow control as well as the 
> low latency changes, unfortunately, we also introduced some lock contention 
> on {{RemoteInputChannel#bufferQueue}} and 
> {{RemoteInputChannel#receivedBuffers}} as well as asking for queue sizes when 
> the only thing we need is whether it is empty or not.
> This was observed as a high idle CPU load with no events in the stream but 
> only watermarks (every 500ms) and many slots on a single machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK opened a new pull request #6553: [FLINK-10141][network] optimisations reducing lock contention

2018-08-14 Thread GitBox
NicoK opened a new pull request #6553: [FLINK-10141][network] optimisations 
reducing lock contention
URL: https://github.com/apache/flink/pull/6553
 
 
   ## What is the purpose of the change
   
   With the changes around introducing credit-based flow control as well as the 
low latency changes, unfortunately, we also introduced some lock contention on 
`RemoteInputChannel#bufferQueue` and `RemoteInputChannel#receivedBuffers`. 
Additionally, we were asking for queue sizes when the only thing we need is 
whether it is empty or not.
   
   As a result, we saw high CPU load during "idle" stream processing jobs with 
no events in the stream but only watermarks (every 500ms) and many slots on a 
single machine.
   
   ## Brief change log
   
   - move `notifyCreditAvailable()` out of the lock around 
`RemoteInputChannel#bufferQueue`
   - move `notifyChannelNonEmpty()` out of the lock around 
`RemoteInputChannel#receivedBuffers`
   - replace `RemoteInputChannel#receivedBuffers.size()` with 
`receivedBuffers.isEmpty()` when this is the only thing needed
   - replace `SingleInputGate#inputChannelsWithData.size()` with 
`inputChannelsWithData.isEmpty()` when this is the only thing needed
   - minor code style improvement in `CreditBasedPartitionRequestClientHandler` 
to improve readability
   
   ## Verifying this change
   
   - This change is already covered by existing tests, such as 
`RemoteInputChannelTest`, `SingleInputGateTest`,..., and everything using the 
network stack.
   - Manually verified that less CPU is used with a `SocketWindowWordCount` 
with some added shuffles, no input elements, only watermarks every 500ms with 4 
TMs, 10 slots each on a single (laptop) machine.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no** (per 
buffer)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **JavaDocs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579737#comment-16579737
 ] 

ASF GitHub Bot commented on FLINK-10022:


zentol commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209932565
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 ##
 @@ -43,38 +43,74 @@
protected double numBytesInRemotePerSecond;
protected double numBytesOutPerSecond;
 
-   public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, 
Meter bytesRemoteIn, Meter bytesOut) {
+   protected long numBuffersInLocal;
+   protected long numBuffersInRemote;
+   protected long numBuffersOut;
+
+   protected double numBuffersInLocalPerSecond;
+   protected double numBuffersInRemotePerSecond;
+   protected double numBuffersOutPerSecond;
+
+   public IOMetrics(
 
 Review comment:
   it should be fine to remove them from this class, and revert the changes to 
the various handlers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add metrics for input/output buffers
> 
>
> Key: FLINK-10022
> URL: https://issues.apache.org/jira/browse/FLINK-10022
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we provide metrics for the input and output queue lengths, records 
> and bytes. For diagnosing network issues, it would also be nice to know the 
> number of buffers because we would then know their fill rate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

2018-08-14 Thread GitBox
zentol commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209932565
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 ##
 @@ -43,38 +43,74 @@
protected double numBytesInRemotePerSecond;
protected double numBytesOutPerSecond;
 
-   public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, 
Meter bytesRemoteIn, Meter bytesOut) {
+   protected long numBuffersInLocal;
+   protected long numBuffersInRemote;
+   protected long numBuffersOut;
+
+   protected double numBuffersInLocalPerSecond;
+   protected double numBuffersInRemotePerSecond;
+   protected double numBuffersOutPerSecond;
+
+   public IOMetrics(
 
 Review comment:
   it should be fine to remove them from this class, and revert the changes to 
the various handlers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579732#comment-16579732
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412854540
 
 
   @pnowojski fixed some issues you just mentioned, will refactor 
`PrintSinkFunction ` and `PrintingOutputFormat `, you mean I should create a 
new class named `TaskOutputWriter ` and extract the same logic code?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-14 Thread GitBox
yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412854540
 
 
   @pnowojski fixed some issues you just mentioned, will refactor 
`PrintSinkFunction ` and `PrintingOutputFormat `, you mean I should create a 
new class named `TaskOutputWriter ` and extract the same logic code?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-6810) Add a set of built-in scalar functions to Table API & SQL

2018-08-14 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-6810:

Description: 
In this JIRA, we will create some sub-tasks for adding specific scalar 
functions such as mathematical-function {{LOG}}, date-functions
 {{DATEADD}}, string-functions {{LPAD}}, etc.

*How to contribute a built-in scalar function*
Thank you very much for contributing a built-in function. In order to make sure 
your contributions are in a good direction, it is recommended to read the 
following instructions.
 # Investigate the behavior of the function that you are going to contribute in 
major DBMSs. This is very important since we have to understand the exact 
semantics of the function.
 # It is recommended to add function for both SQL and table-API (Java and 
Scala).
 # For every scalar function, add corresponding docs which should include a 
SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make sure 
your description of the function is accurate. Please do not simply copy 
documentation from other projects, especially if the projects are not Apache 
licensed.
 # Take overflow, NullPointerException and other exceptions into consideration.
 # Add unit tests for every new function and its supported APIs. Have a look at 
{{ScalarFunctionsTest}}, {{SqlExpressionTest}}, 
{{ScalaFunctionsValidationTest}}, etc. for how to implement function tests.
 !how to add a scalar function.png! 
Welcome anybody to add the sub-task about standard database scalar function.

Note: Usually, adding a runtime function to {{ScalarFunctions.scala}} is 
sufficient. However, sometimes it makes sense to implement a {{CallGenerator}} 
for {{FunctionGenerator}} to leverage object reuse. E.g., {{HashCalcCallGen}} 
creates a {{MessageDigest}} only once for {{SHA2}} if the parameters are 
literals.

  was:
In this JIRA, we will create some sub-tasks for adding specific scalar 
functions such as mathematical-function {{LOG}}, date-functions
 {{DATEADD}}, string-functions {{LPAD}}, etc.

*How to contribute a built-in scalar function*
Thank you very much for contributing a built-in function. In order to make sure 
your contributions are in a good direction, it is recommended to read the 
following instructions.
 # Investigate the behavior of the function that you are going to contribute in 
major DBMSs. This is very important since we have to understand the exact 
semantics of the function.
 # It is recommended to add function for both SQL and table-API (Java and 
Scala).
 # For every scalar function, add corresponding docs which should include a 
SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make sure 
your description of the function is accurate. Please do not simply copy 
documentation from other projects, especially if the projects are not Apache 
licensed.
 # Take overflow, NullPointerException and other exceptions into consideration.
 # Add unit tests for every new function and its supported APIs. Have a look at 
{{ScalarFunctionsTest}}, {{SqlExpressionTest}}, 
{{ScalaFunctionsValidationTest}}, etc. for how to implement function tests.
 !how to add a scalar function.png! 
Welcome anybody to add the sub-task about standard database scalar function.

Note: Usually, adding a runtime function to {{ScalarFunctions.scala}} is 
sufficient. However, sometimes it makes sense to implement a {{CallGenerator}} 
for {{FunctionGenerator}} to leverage object reuse. E.g., {{HashCalcCallGen}} 
creates a {{MessageDigest}} only once if the parameters are literals.


> Add a set of built-in scalar functions to Table API & SQL
> -
>
> Key: FLINK-6810
> URL: https://issues.apache.org/jira/browse/FLINK-6810
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: starter
> Attachments: how to add a scalar function.png
>
>
> In this JIRA, we will create some sub-tasks for adding specific scalar 
> functions such as mathematical-function {{LOG}}, date-functions
>  {{DATEADD}}, string-functions {{LPAD}}, etc.
> *How to contribute a built-in scalar function*
> Thank you very much for contributing a built-in function. In order to make 
> sure your contributions are in a good direction, it is recommended to read 
> the following instructions.
>  # Investigate the behavior of the function that you are going to contribute 
> in major DBMSs. This is very important since we have to understand the exact 
> semantics of the function.
>  # It is recommended to add function for both SQL and table-API (Java and 
> Scala).
>  # For every scalar function, add corresponding docs which should include a 
> SQL, a Java and a Scala version in 

[jira] [Updated] (FLINK-6810) Add a set of built-in scalar functions to Table API & SQL

2018-08-14 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-6810:

Description: 
In this JIRA, we will create some sub-tasks for adding specific scalar 
functions such as mathematical-function {{LOG}}, date-functions
 {{DATEADD}}, string-functions {{LPAD}}, etc.

*How to contribute a built-in scalar function*
Thank you very much for contributing a built-in function. In order to make sure 
your contributions are in a good direction, it is recommended to read the 
following instructions.
 # Investigate the behavior of the function that you are going to contribute in 
major DBMSs. This is very important since we have to understand the exact 
semantics of the function.
 # It is recommended to add function for both SQL and table-API (Java and 
Scala).
 # For every scalar function, add corresponding docs which should include a 
SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make sure 
your description of the function is accurate. Please do not simply copy 
documentation from other projects, especially if the projects are not Apache 
licensed.
 # Take overflow, NullPointerException and other exceptions into consideration.
 # Add unit tests for every new function and its supported APIs. Have a look at 
{{ScalarFunctionsTest}}, {{SqlExpressionTest}}, 
{{ScalaFunctionsValidationTest}}, etc. for how to implement function tests.
 !how to add a scalar function.png! 
Welcome anybody to add the sub-task about standard database scalar function.

Note: Usually, adding a runtime function to {{ScalarFunctions.scala}} is 
sufficient. However, sometimes it makes sense to implement a {{CallGenerator}} 
for {{FunctionGenerator}} to leverage object reuse. E.g., {{HashCalcCallGen}} 
creates a {{MessageDigest}} only once if the parameters are literals.

  was:
In this JIRA, we will create some sub-tasks for adding specific scalar 
functions such as mathematical-function {{LOG}}, date-functions
 {{DATEADD}}, string-functions {{LPAD}}, etc.

*How to contribute a built-in scalar function*
Thank you very much for contributing a built-in function. In order to make sure 
your contributions are in a good direction, it is recommended to read the 
following instructions.
 # Investigate the behavior of the function that you are going to contribute in 
major DBMSs. This is very important since we have to understand the exact 
semantics of the function.
 # It is recommended to add function for both SQL and table-API (Java and 
Scala).
 # For every scalar function, add corresponding docs which should include a 
SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make sure 
your description of the function is accurate. Please do not simply copy 
documentation from other projects, especially if the projects are not Apache 
licensed.
 # Take overflow, NullPointerException and other exceptions into consideration.
 # Add unit tests for every new function and its supported APIs. Have a look at 
{{ScalarFunctionsTest}}, {{SqlExpressionTest}}, 
{{ScalaFunctionsValidationTest}}, etc. for how to implement function tests.
 !how to add a scalar function.png! 
Welcome anybody to add the sub-task about standard database scalar function.


> Add a set of built-in scalar functions to Table API & SQL
> -
>
> Key: FLINK-6810
> URL: https://issues.apache.org/jira/browse/FLINK-6810
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: starter
> Attachments: how to add a scalar function.png
>
>
> In this JIRA, we will create some sub-tasks for adding specific scalar 
> functions such as mathematical-function {{LOG}}, date-functions
>  {{DATEADD}}, string-functions {{LPAD}}, etc.
> *How to contribute a built-in scalar function*
> Thank you very much for contributing a built-in function. In order to make 
> sure your contributions are in a good direction, it is recommended to read 
> the following instructions.
>  # Investigate the behavior of the function that you are going to contribute 
> in major DBMSs. This is very important since we have to understand the exact 
> semantics of the function.
>  # It is recommended to add function for both SQL and table-API (Java and 
> Scala).
>  # For every scalar function, add corresponding docs which should include a 
> SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make 
> sure your description of the function is accurate. Please do not simply copy 
> documentation from other projects, especially if the projects are not Apache 
> licensed.
>  # Take overflow, NullPointerException and other exceptions into 
> consideration.
>  # Add unit tests 

[jira] [Commented] (FLINK-8545) Implement upsert stream table source

2018-08-14 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579726#comment-16579726
 ] 

Piotr Nowojski commented on FLINK-8545:
---

This `AppendStreamTable` would still carry on deletes (`change` flag from 
`CRow`), so wouldn't it be confusing name? Also do we have to expose in API 
({{UpsertStreamTable)}} how are the updates encoded? Shouldn't those be just 
traits of the underlying stream?

I mean that stream created from upsert source, could just have a trait 
{{primary_key}} and maybe flag that it's {{upsert}}. If such stream is 
incompatible with required input of some operator, we would place there 
{{UpsertToRetractOperator}} that would change the upsert stream back into 
retraction stream. Such operator would need to hold whole table on state (maybe 
pruning on watermarks), but such construct would be able to support upsert 
sources in all queries. It would also allow us to easier optimize queries in 
the future. For example:
 # Reorder {{UpsertToRetractOperator}} with filters
 # Reorder {{UpsertToRetractOperator}} with projections
 # Remove {{UpsertToRetractOperator}} if following operator can handle upserts 
(like UpsertDataSink, or Joins)

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6810) Add a set of built-in scalar functions to Table API & SQL

2018-08-14 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-6810:

Summary: Add a set of built-in scalar functions to Table API & SQL  (was: 
Add Some built-in Scalar Function supported)

> Add a set of built-in scalar functions to Table API & SQL
> -
>
> Key: FLINK-6810
> URL: https://issues.apache.org/jira/browse/FLINK-6810
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: starter
> Attachments: how to add a scalar function.png
>
>
> In this JIRA, we will create some sub-tasks for adding specific scalar 
> functions such as mathematical-function {{LOG}}, date-functions
>  {{DATEADD}}, string-functions {{LPAD}}, etc.
> *How to contribute a built-in scalar function*
> Thank you very much for contributing a built-in function. In order to make 
> sure your contributions are in a good direction, it is recommended to read 
> the following instructions.
>  # Investigate the behavior of the function that you are going to contribute 
> in major DBMSs. This is very important since we have to understand the exact 
> semantics of the function.
>  # It is recommended to add function for both SQL and table-API (Java and 
> Scala).
>  # For every scalar function, add corresponding docs which should include a 
> SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make 
> sure your description of the function is accurate. Please do not simply copy 
> documentation from other projects, especially if the projects are not Apache 
> licensed.
>  # Take overflow, NullPointerException and other exceptions into 
> consideration.
>  # Add unit tests for every new function and its supported APIs. Have a look 
> at {{ScalarFunctionsTest}}, {{SqlExpressionTest}}, 
> {{ScalaFunctionsValidationTest}}, etc. for how to implement function tests.
>  !how to add a scalar function.png! 
> Welcome anybody to add the sub-task about standard database scalar function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10130) How to define two hdfs name-node IPs in flink-conf.yaml file

2018-08-14 Thread Paul Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579710#comment-16579710
 ] 

Paul Lin commented on FLINK-10130:
--

It could be solved by  [HDFS name 
services|https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html],
 and I think it might be better to leave it to the HDFS client. FYI.

> How to define two hdfs name-node IPs in flink-conf.yaml file
> 
>
> Key: FLINK-10130
> URL: https://issues.apache.org/jira/browse/FLINK-10130
> Project: Flink
>  Issue Type: Bug
>Reporter: Keshav Lodhi
>Priority: Blocker
> Attachments: docker-entrypoints.sh
>
>
> Hi Team,
> Here is, what we are looking for:
>  * We have  flink HA dockerized cluster with (3 zookeepers, 2 job-managers, 3 
> task-managers).
>  * We are using HDFS from the flink to store some data. The problem we are 
> facing is that, we are not able to pass 2 name-node IPs in config. 
>  * These are the config parameters we want to add two name-node IPs
>  #       "state.checkpoints.dir: hdfs://X.X.X.X:9001/flinkdatastorage"
>  #       "state.backend.fs.checkpointdir: 
> hdfs://X.X.X.X:9001/flinkdatastorage"
>  #       "high-availability.zookeeper.storageDir: 
> hdfs://X.X.X.X:9001/flink/recovery"
> Currently we are passing only one name-node IP
> Please advise. 
> I have attached the sample *docker-entrypoint.sh* file: 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7205) Add UUID supported in TableAPI/SQL

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579706#comment-16579706
 ] 

ASF GitHub Bot commented on FLINK-7205:
---

asfgit closed pull request #6381: [FLINK-7205] [table] Add UUID supported in 
SQL and TableApi
URL: https://github.com/apache/flink/pull/6381
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 148f30727d9..4d2cbbadc0a 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1664,6 +1664,17 @@ RAND_INTEGER(seed integer, bound integer)
 

 
+
+ 
+   {% highlight text %}
+UUID()
+{% endhighlight %}
+ 
+
+  Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' 
according to RFC 4122.
+
+   
+
 
   
 {% highlight text %}
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index b702dddcde5..a78543bd6c3 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2322,6 +2322,17 @@ randInteger(seed integer, bound integer)
 

 
+
+ 
+   {% highlight java %}
+UUID()
+{% endhighlight %}
+ 
+
+  Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' 
according to RFC 4122.
+
+   
+   
 
  
{% highlight java %}
@@ -3910,6 +3921,17 @@ randInteger(seed integer, bound integer)
 

 
+
+ 
+   {% highlight scala %}
+UUID()
+{% endhighlight %}
+ 
+
+  Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' 
according to RFC 4122.
+
+   
+   
 
  
{% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index c7c805f6743..23ac30d3d8b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1247,4 +1247,17 @@ object concat_ws {
   }
 }
 
+/**
+  * Returns the uuid according to RFC 4122.
+  */
+object uuid {
+
+  /**
+* Returns the uuid according to RFC 4122.
+*/
+  def apply(): Expression = {
+UUID()
+  }
+}
+
 // scalastyle:on object.name
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 74b69d6afcc..16beea9149d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -158,6 +158,12 @@ object FunctionGenerator {
 STRING_TYPE_INFO,
 BuiltInMethods.TOBASE64)
 
+  addSqlFunction(
+UUID,
+Seq(),
+new UUIDCallGen()
+  )
+
   // 
--
   // Arithmetic functions
   // 
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala
new file mode 100644
index 000..537a19857c9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO
+import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import 

[GitHub] asfgit closed pull request #6381: [FLINK-7205] [table] Add UUID supported in SQL and TableApi

2018-08-14 Thread GitBox
asfgit closed pull request #6381: [FLINK-7205] [table] Add UUID supported in 
SQL and TableApi
URL: https://github.com/apache/flink/pull/6381
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 148f30727d9..4d2cbbadc0a 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1664,6 +1664,17 @@ RAND_INTEGER(seed integer, bound integer)
 

 
+
+ 
+   {% highlight text %}
+UUID()
+{% endhighlight %}
+ 
+
+  Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' 
according to RFC 4122.
+
+   
+
 
   
 {% highlight text %}
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index b702dddcde5..a78543bd6c3 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2322,6 +2322,17 @@ randInteger(seed integer, bound integer)
 

 
+
+ 
+   {% highlight java %}
+UUID()
+{% endhighlight %}
+ 
+
+  Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' 
according to RFC 4122.
+
+   
+   
 
  
{% highlight java %}
@@ -3910,6 +3921,17 @@ randInteger(seed integer, bound integer)
 

 
+
+ 
+   {% highlight scala %}
+UUID()
+{% endhighlight %}
+ 
+
+  Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' 
according to RFC 4122.
+
+   
+   
 
  
{% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index c7c805f6743..23ac30d3d8b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1247,4 +1247,17 @@ object concat_ws {
   }
 }
 
+/**
+  * Returns the uuid according to RFC 4122.
+  */
+object uuid {
+
+  /**
+* Returns the uuid according to RFC 4122.
+*/
+  def apply(): Expression = {
+UUID()
+  }
+}
+
 // scalastyle:on object.name
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 74b69d6afcc..16beea9149d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -158,6 +158,12 @@ object FunctionGenerator {
 STRING_TYPE_INFO,
 BuiltInMethods.TOBASE64)
 
+  addSqlFunction(
+UUID,
+Seq(),
+new UUIDCallGen()
+  )
+
   // 
--
   // Arithmetic functions
   // 
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala
new file mode 100644
index 000..537a19857c9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO
+import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+  * Generates a UUID function call.
+  */
+class UUIDCallGen extends CallGenerator {
+
+  override def generate(codeGenerator: CodeGenerator,
+operands: 

[jira] [Resolved] (FLINK-7205) Add UUID supported in TableAPI/SQL

2018-08-14 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-7205.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed in 1.7.0: 913b0413882939c30da4ad4df0cabc84dfe69ea0

> Add UUID supported in TableAPI/SQL
> --
>
> Key: FLINK-7205
> URL: https://issues.apache.org/jira/browse/FLINK-7205
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> UUID() returns a value that conforms to UUID version 1 as described in RFC 
> 4122. The value is a 128-bit number represented as a utf8 string of five 
> hexadecimal numbers in ---- format:
> The first three numbers are generated from the low, middle, and high parts of 
> a timestamp. The high part also includes the UUID version number.
> The fourth number preserves temporal uniqueness in case the timestamp value 
> loses monotonicity (for example, due to daylight saving time).
> The fifth number is an IEEE 802 node number that provides spatial uniqueness. 
> A random number is substituted if the latter is not available (for example, 
> because the host device has no Ethernet card, or it is unknown how to find 
> the hardware address of an interface on the host operating system). In this 
> case, spatial uniqueness cannot be guaranteed. Nevertheless, a collision 
> should have very low probability.
> See: [RFC 4122: 
> http://www.ietf.org/rfc/rfc4122.txt|http://www.ietf.org/rfc/rfc4122.txt]
> See detailed semantics:
>MySql: 
> [https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid|https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid]
> Welcome anybody feedback -:).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579701#comment-16579701
 ] 

ASF GitHub Bot commented on FLINK-9013:
---

dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] 
Document yarn.containers.vcores only being effective whe…
URL: https://github.com/apache/flink/pull/6294#discussion_r209925707
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ##
 @@ -63,9 +65,14 @@
 */
public static final ConfigOption VCORES =
key("yarn.containers.vcores")
-   .defaultValue(-1)
-   .withDescription("The number of virtual cores (vcores) per YARN 
container. By default, the number of vcores" +
-   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise.");
+   .defaultValue(-1)
+   .withDescription(Description.builder().text(
+   "The number of virtual cores (vcores) 
per YARN container. By default, the number of vcores" +
+   " is set to the number of slots per 
TaskManager, if set, or to 1, otherwise. In order for this" +
+   " parameter to be used your cluster 
must have CPU scheduling enabled. You can do this by setting" +
+   " the %s.",
+   
code("org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"))
 
 Review comment:
   @GJL thanks for your comments.The original reason for introducing the 
DescriptionBuilder was not to embed html or markdown into the description so 
that we can format it differently in case we want to print it in Exception 
description. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document yarn.containers.vcores only being effective when adapting YARN config
> --
>
> Key: FLINK-9013
> URL: https://issues.apache.org/jira/browse/FLINK-9013
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3
>
>
> Even after specifying {{yarn.containers.vcores}} and having Flink request 
> such a container from YARN, it may not take these into account at all and 
> return a container with 1 vcore.
> The YARN configuration needs to be adapted to take the vcores into account, 
> e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}:
> {code}
> 
>   yarn.resourcemanager.scheduler.class
>   
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
> 
> {code}
> This fact should be documented at least at the configuration parameter 
> documentation of  {{yarn.containers.vcores}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…

2018-08-14 Thread GitBox
dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] 
Document yarn.containers.vcores only being effective whe…
URL: https://github.com/apache/flink/pull/6294#discussion_r209925707
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ##
 @@ -63,9 +65,14 @@
 */
public static final ConfigOption VCORES =
key("yarn.containers.vcores")
-   .defaultValue(-1)
-   .withDescription("The number of virtual cores (vcores) per YARN 
container. By default, the number of vcores" +
-   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise.");
+   .defaultValue(-1)
+   .withDescription(Description.builder().text(
+   "The number of virtual cores (vcores) 
per YARN container. By default, the number of vcores" +
+   " is set to the number of slots per 
TaskManager, if set, or to 1, otherwise. In order for this" +
+   " parameter to be used your cluster 
must have CPU scheduling enabled. You can do this by setting" +
+   " the %s.",
+   
code("org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"))
 
 Review comment:
   @GJL thanks for your comments.The original reason for introducing the 
DescriptionBuilder was not to embed html or markdown into the description so 
that we can format it differently in case we want to print it in Exception 
description. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579695#comment-16579695
 ] 

ASF GitHub Bot commented on FLINK-10022:


NicoK commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209923936
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 ##
 @@ -104,7 +130,6 @@ public Meter getNumBytesInRemoteRateMeter() {
public Meter getNumBytesOutRateMeter() {
return numBytesOutRate;
}
-
 
 Review comment:
   don't know either...reverting that...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add metrics for input/output buffers
> 
>
> Key: FLINK-10022
> URL: https://issues.apache.org/jira/browse/FLINK-10022
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we provide metrics for the input and output queue lengths, records 
> and bytes. For diagnosing network issues, it would also be nice to know the 
> number of buffers because we would then know their fill rate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers

2018-08-14 Thread GitBox
NicoK commented on a change in pull request #6551: 
[FLINK-10022][network][metrics] add metrics for input/output buffers
URL: https://github.com/apache/flink/pull/6551#discussion_r209923936
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 ##
 @@ -104,7 +130,6 @@ public Meter getNumBytesInRemoteRateMeter() {
public Meter getNumBytesOutRateMeter() {
return numBytesOutRate;
}
-
 
 Review comment:
   don't know either...reverting that...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >