[jira] [Comment Edited] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580624#comment-16580624 ] Thomas Weise edited comment on FLINK-10074 at 8/15/18 4:09 AM: --- [~till.rohrmann] it is probably too difficult to retain the count in the JM failure case, but how about making it correct for other failures? When the JM fails - which is a very small probability - we would reset / not retain the count. The majority of failures that we are concerned with are transient issues in TMs where subtasks just get redeployed, much fewer cases TM machine failures. For all these we could have a more accurate behavior by retaining the count and failing right away on the next checkpoint failure. was (Author: thw): [~till.rohrmann] it is probably unreasonably hard to retain the count in the JM failure case, but how about making it correct for other failures? When the JM fails - which is a very small probability - we would reset / not retain the count. The majority of failures that we are concerned with are transient issues in TMs where subtasks just get redeployed, much fewer cases TM machine failures. For all these we could have a more accurate behavior by retaining the count and failing right away on the next checkpoint failure. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580699#comment-16580699 ] Thomas Weise commented on FLINK-10074: -- [~yanghua] this is what I meant also, I updated my comment for clarity. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580624#comment-16580624 ] Thomas Weise edited comment on FLINK-10074 at 8/15/18 4:07 AM: --- [~till.rohrmann] it is probably unreasonably hard to retain the count in the JM failure case, but how about making it correct for other failures? When the JM fails - which is a very small probability - we would reset / not retain the count. The majority of failures that we are concerned with are transient issues in TMs where subtasks just get redeployed, much fewer cases TM machine failures. For all these we could have a more accurate behavior by retaining the count and failing right away on the next checkpoint failure. was (Author: thw): [~till.rohrmann] it is probably unreasonably hard to do for the JM failure case, but how about making it correct with best effort? Only when the JM fails, we would not retain the count, which is a very small probability. The majority of failures are transient issues in TMs where subtasks just get redeployed, much fewer cases TM machine failures. For all these we could have a more accurate behavior by retaining the count and failing right away on the next checkpoint failure. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580687#comment-16580687 ] vinoyang commented on FLINK-10074: -- [~thw] I personally prefer that once JM failover, the counter will reset. I don't think it's necessary to introduce too much complexity for this. If we need to maintain a global counter across JM processes, we will use third-party components such as zookeeper. I think it is appropriate to maintain this counter for the life of a JM process. Once JM failover, the Job will be restored (re-deployed, run), and it is reasonable to reset the counter for a new runtime environment. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580677#comment-16580677 ] ASF GitHub Bot commented on FLINK-8532: --- Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-413082655 cc @tillrohrmann could you help review this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Priority: Minor > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-413082655 cc @tillrohrmann could you help review this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580627#comment-16580627 ] vinoyang commented on FLINK-: - [~walterddr] I don't know if this is the SQL standard Scalar function, but there are databases that provide this function. I don't know if the functions in Flink Table/SQL are all SQL standard functions. It seems that some are not, such as "similar to". Where can I see which standard functions are there? I think it's reasonable to add some very common functions, I am going to try to make this function support more types. [~twalthr] > Add ISNUMERIC supported in Table API/SQL > > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > ISNUMERIC function used to verify a expression is a valid numberic type. > documentation : > https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580624#comment-16580624 ] Thomas Weise commented on FLINK-10074: -- [~till.rohrmann] it is probably unreasonably hard to do for the JM failure case, but how about making it correct with best effort? Only when the JM fails, we would not retain the count, which is a very small probability. The majority of failures are transient issues in TMs where subtasks just get redeployed, much fewer cases TM machine failures. For all these we could have a more accurate behavior by retaining the count and failing right away on the next checkpoint failure. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8953) Resolve unresolved field references in FieldComputer expressions
[ https://issues.apache.org/jira/browse/FLINK-8953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renjie Liu reassigned FLINK-8953: - Assignee: (was: Renjie Liu) > Resolve unresolved field references in FieldComputer expressions > > > Key: FLINK-8953 > URL: https://issues.apache.org/jira/browse/FLINK-8953 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Timo Walther >Priority: Major > > When implementing the {{FieldComputer.getExpression}} method, it is not > possible to use API classes but only internal expression case classes. > It would be great to also define timestamp extractors like: > {code} > def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression > = { > // 'x.cast(Types.LONG) > // ExpressionParser.parseExpression("x.cast(LONG)") > } > {code} > An even better solution would be to provide different `getExpression()` > methods that an implementor can override. The general goal should be to > define this as natural as possible. In the future we should also support SQL: > {code} > def getJavaExpression(fieldAccesses: Array[ResolvedFieldReference]): String > = { > "x.cast(LONG)" > } > def getSQLExpression(fieldAccesses: Array[ResolvedFieldReference]): String > = { > "CAST(x AS LONG)" > } > {code} > The final design is still up for discussion. These are just ideas. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10101) Mesos web ui url is missing.
[ https://issues.apache.org/jira/browse/FLINK-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580531#comment-16580531 ] ASF GitHub Bot commented on FLINK-10101: liurenjie1024 commented on issue #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#issuecomment-413045382 @GJL I've fixed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos web ui url is missing. > > > Key: FLINK-10101 > URL: https://issues.apache.org/jira/browse/FLINK-10101 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.5.0, 1.5.1, 1.5.2 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Mesos web ui url is missing in new deploy mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] liurenjie1024 commented on issue #6522: [FLINK-10101][mesos] Add web ui url for mesos.
liurenjie1024 commented on issue #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#issuecomment-413045382 @GJL I've fixed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9488) Create common entry point for master and workers
[ https://issues.apache.org/jira/browse/FLINK-9488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580468#comment-16580468 ] Prithvi Raj commented on FLINK-9488: I'm not sure whether this is the right place to ask, but how does one make use of this change when running Flink on K8s? Are there docs available somewhere? > Create common entry point for master and workers > > > Key: FLINK-9488 > URL: https://issues.apache.org/jira/browse/FLINK-9488 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > To make the container setup easier, we should provide a single cluster entry > point which uses leader election to become either the master or a worker > which runs the {{TaskManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers
[ https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580343#comment-16580343 ] ASF GitHub Bot commented on FLINK-10022: NicoK closed pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index bf0aee1ed42..bac40f00066 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1217,6 +1217,27 @@ Thus, in order to infer the metric identifier: The number of bytes this task reads from a remote source per second. Meter + + Task + numBuffersInLocal + The total number of network buffers this task has read from a local source. + Counter + + + numBuffersInLocalPerSecond + The number of network buffers this task reads from a local source per second. + Meter + + + numBuffersInRemote + The total number of network buffers this task has read from a remote source. + Counter + + + numBuffersInRemotePerSecond + The number of network buffers this task reads from a remote source per second. + Meter + numBytesOut The total number of bytes this task has emitted. @@ -1227,6 +1248,16 @@ Thus, in order to infer the metric identifier: The number of bytes this task emits per second. Meter + + numBuffersOut + The total number of network buffers this task has emitted. + Counter + + + numBuffersOutPerSecond + The number of network buffers this task emits per second. + Meter + Task/Operator numRecordsIn diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index e3a8e49316f..970795c0564 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -71,6 +71,8 @@ private Counter numBytesOut = new SimpleCounter(); + private Counter numBuffersOut = new SimpleCounter(); + public RecordWriter(ResultPartitionWriter writer) { this(writer, new RoundRobinChannelSelector()); } @@ -184,6 +186,7 @@ public void clearBuffers() { */ public void setMetricGroup(TaskIOMetricGroup metrics) { numBytesOut = metrics.getNumBytesOutCounter(); + numBuffersOut = metrics.getNumBuffersOutCounter(); } /** @@ -200,6 +203,7 @@ private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerialize bufferBuilders[targetChannel] = Optional.empty(); numBytesOut.inc(bufferBuilder.finish()); + numBuffersOut.inc(); serializer.clear(); return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 3ce58661281..2a7cedf6949 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -64,6 +64,8 @@ protected final Counter numBytesIn; + protected final Counter numBuffersIn; + /** The current backoff (in ms) */ private int currentBackoff; @@ -73,7 +75,8 @@ protected InputChannel( ResultPartitionID partitionId, int initialBackoff, int maxBackoff, - Counter numBytesIn) { + Counter numBytesIn, + Counter numBuffersIn) { checkArgument(channelIndex >= 0); @@ -91,6 +94,7 @@ protected InputChannel( this.currentBackoff = initial == 0 ? -1 : 0; this.numBytesIn = numBytesIn; + this.numBuffersIn = numBuffersIn; } // diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
[GitHub] NicoK closed pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
NicoK closed pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index bf0aee1ed42..bac40f00066 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1217,6 +1217,27 @@ Thus, in order to infer the metric identifier: The number of bytes this task reads from a remote source per second. Meter + + Task + numBuffersInLocal + The total number of network buffers this task has read from a local source. + Counter + + + numBuffersInLocalPerSecond + The number of network buffers this task reads from a local source per second. + Meter + + + numBuffersInRemote + The total number of network buffers this task has read from a remote source. + Counter + + + numBuffersInRemotePerSecond + The number of network buffers this task reads from a remote source per second. + Meter + numBytesOut The total number of bytes this task has emitted. @@ -1227,6 +1248,16 @@ Thus, in order to infer the metric identifier: The number of bytes this task emits per second. Meter + + numBuffersOut + The total number of network buffers this task has emitted. + Counter + + + numBuffersOutPerSecond + The number of network buffers this task emits per second. + Meter + Task/Operator numRecordsIn diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index e3a8e49316f..970795c0564 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -71,6 +71,8 @@ private Counter numBytesOut = new SimpleCounter(); + private Counter numBuffersOut = new SimpleCounter(); + public RecordWriter(ResultPartitionWriter writer) { this(writer, new RoundRobinChannelSelector()); } @@ -184,6 +186,7 @@ public void clearBuffers() { */ public void setMetricGroup(TaskIOMetricGroup metrics) { numBytesOut = metrics.getNumBytesOutCounter(); + numBuffersOut = metrics.getNumBuffersOutCounter(); } /** @@ -200,6 +203,7 @@ private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerialize bufferBuilders[targetChannel] = Optional.empty(); numBytesOut.inc(bufferBuilder.finish()); + numBuffersOut.inc(); serializer.clear(); return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 3ce58661281..2a7cedf6949 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -64,6 +64,8 @@ protected final Counter numBytesIn; + protected final Counter numBuffersIn; + /** The current backoff (in ms) */ private int currentBackoff; @@ -73,7 +75,8 @@ protected InputChannel( ResultPartitionID partitionId, int initialBackoff, int maxBackoff, - Counter numBytesIn) { + Counter numBytesIn, + Counter numBuffersIn) { checkArgument(channelIndex >= 0); @@ -91,6 +94,7 @@ protected InputChannel( this.currentBackoff = initial == 0 ? -1 : 0; this.numBytesIn = numBytesIn; + this.numBuffersIn = numBuffersIn; } // diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index f9c75addd51..4b3a8ff9773 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++
[jira] [Updated] (FLINK-10042) Extract snapshot algorithms from inner classes into full classes
[ https://issues.apache.org/jira/browse/FLINK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10042: --- Labels: pull-request-available (was: ) > Extract snapshot algorithms from inner classes into full classes > > > Key: FLINK-10042 > URL: https://issues.apache.org/jira/browse/FLINK-10042 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10042) Extract snapshot algorithms from inner classes into full classes
[ https://issues.apache.org/jira/browse/FLINK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580284#comment-16580284 ] ASF GitHub Bot commented on FLINK-10042: StefanRRichter commented on issue #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes URL: https://github.com/apache/flink/pull/6556#issuecomment-412980594 CC @azagrebin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract snapshot algorithms from inner classes into full classes > > > Key: FLINK-10042 > URL: https://issues.apache.org/jira/browse/FLINK-10042 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10042) Extract snapshot algorithms from inner classes into full classes
[ https://issues.apache.org/jira/browse/FLINK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580283#comment-16580283 ] ASF GitHub Bot commented on FLINK-10042: StefanRRichter opened a new pull request #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes URL: https://github.com/apache/flink/pull/6556 ## What is the purpose of the change This PR is part 2 in the refactoring/decomposition of `RocksDBKeyedStateBackend`. All snapshotting algorithms are extracted from the backend file into their own full classes. In a second step, the PR also introduces `AsyncSnapshotCallable` as a general strategy for asynchronous snapshots that outlines resource de/allocation, closing, and cancellation. ## Verifying this change This change is already covered by existing tests, and I added `AsyncSnapshotCallableTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract snapshot algorithms from inner classes into full classes > > > Key: FLINK-10042 > URL: https://issues.apache.org/jira/browse/FLINK-10042 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on issue #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes
StefanRRichter commented on issue #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes URL: https://github.com/apache/flink/pull/6556#issuecomment-412980594 CC @azagrebin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter opened a new pull request #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes
StefanRRichter opened a new pull request #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes URL: https://github.com/apache/flink/pull/6556 ## What is the purpose of the change This PR is part 2 in the refactoring/decomposition of `RocksDBKeyedStateBackend`. All snapshotting algorithms are extracted from the backend file into their own full classes. In a second step, the PR also introduces `AsyncSnapshotCallable` as a general strategy for asynchronous snapshots that outlines resource de/allocation, closing, and cancellation. ## Verifying this change This change is already covered by existing tests, and I added `AsyncSnapshotCallableTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
walterddr commented on a change in pull request #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#discussion_r210052280 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -750,7 +751,28 @@ abstract class TableEnvironment(val config: TableConfig) { if (null == sinkTableName) throw TableException("Name of TableSink must not be null.") if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.") if (!isRegistered(sinkTableName)) { - throw TableException(s"No table was registered under the name $sinkTableName.") + // try resolving and registering sink table from registered external catalogs + try { +val paths = sinkTableName.split("\\.") +if (paths.length > 1) { Review comment: I am not exactly sure but this seems problematic to me. is there a guarantee that the first element of the path will always be `catalog` name? This also depends on whether the path requires to be fully-qualify or not, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580197#comment-16580197 ] Rong Rong commented on FLINK-: -- I do not have access to the newest SQL 2016 standard, but is {{IS_NUMERIC}} in it? Seems like multiple DBMS utilizes it in different way. Some takes more than just String/VARCHAR type as inputs. > Add ISNUMERIC supported in Table API/SQL > > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > ISNUMERIC function used to verify a expression is a valid numberic type. > documentation : > https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10079) Automatically register sink table from external catalogs
[ https://issues.apache.org/jira/browse/FLINK-10079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580192#comment-16580192 ] Rong Rong commented on FLINK-10079: --- I agree we should either make the documentation more consistent, or supporting the sink functionalities of the `ExternalCatalog` supports `INSERT INTO`. However I think this can be easily achieved using the unified TableSourceSink Configuration in FLINK-8866. [~twalthr] do you have any suggestions on this ticket? > Automatically register sink table from external catalogs > > > Key: FLINK-10079 > URL: https://issues.apache.org/jira/browse/FLINK-10079 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Major > Fix For: 1.7.0 > > > In the documentation, there is a description: > {quote}Once registered in a TableEnvironment, all tables defined in a > ExternalCatalog can be accessed from Table API or SQL queries by specifying > their full path, such as catalog.database.table. > {quote} > Currently, this is true only for source tables. For sink table (specified in > the Table API or SQL), the users have to explicitly register it even though > it is defined in a registered ExternalCatalog, otherwise "No table was > registered under the name XXX" TableException would be thrown. > It would be better keep consistent between source table and sink table, and > the users would enjoy more convenient approach to inserting into sink tables. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10145) Add replace supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-10145: - Assignee: Guibo Pan > Add replace supported in TableAPI and SQL > - > > Key: FLINK-10145 > URL: https://issues.apache.org/jira/browse/FLINK-10145 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Guibo Pan >Assignee: Guibo Pan >Priority: Major > > replace is an useful function for String. > for example: > {code:java} > select replace("Hello World", "World", "Flink") // return "Hello Flink" > select replace("ababab", "abab", "z") // return "zab" > {code} > It is supported as a UDF in Hive, more details please see[1] > [1]: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10101) Mesos web ui url is missing.
[ https://issues.apache.org/jira/browse/FLINK-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580143#comment-16580143 ] ASF GitHub Bot commented on FLINK-10101: GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#discussion_r210040604 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -120,6 +120,10 @@ /** A local actor system for using the helper actors. */ private final ActorSystem actorSystem; + /** Web url for to show in mesos page.*/ Review comment: Nitpicking here but there should be a space between the period and the asterisk for consistency: ``` /** Web url for to show in mesos page. */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos web ui url is missing. > > > Key: FLINK-10101 > URL: https://issues.apache.org/jira/browse/FLINK-10101 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.5.0, 1.5.1, 1.5.2 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Mesos web ui url is missing in new deploy mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos.
GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#discussion_r210040604 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -120,6 +120,10 @@ /** A local actor system for using the helper actors. */ private final ActorSystem actorSystem; + /** Web url for to show in mesos page.*/ Review comment: Nitpicking here but there should be a space between the period and the asterisk for consistency: ``` /** Web url for to show in mesos page. */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10101) Mesos web ui url is missing.
[ https://issues.apache.org/jira/browse/FLINK-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580142#comment-16580142 ] ASF GitHub Bot commented on FLINK-10101: GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#discussion_r210036352 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -185,6 +190,8 @@ public MesosResourceManager( this.workersInNew = new HashMap<>(8); this.workersInLaunch = new HashMap<>(8); this.workersBeingReturned = new HashMap<>(8); + + this.webUiUrl = webUiUrl; Review comment: I would move this line below this assignment: ``` this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec); ``` so that the assignments of all constructor arguments are grouped. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos web ui url is missing. > > > Key: FLINK-10101 > URL: https://issues.apache.org/jira/browse/FLINK-10101 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.5.0, 1.5.1, 1.5.2 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Mesos web ui url is missing in new deploy mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10101) Mesos web ui url is missing.
[ https://issues.apache.org/jira/browse/FLINK-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580141#comment-16580141 ] ASF GitHub Bot commented on FLINK-10101: GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#discussion_r210037022 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -237,6 +244,10 @@ protected void initialize() throws ResourceManagerException { Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo() .clone() .setCheckpoint(true); + if (webUiUrl != null) { + frameworkInfo = frameworkInfo.setWebuiUrl(webUiUrl); Review comment: It is not needed to reassign `frameworkInfo` because the method has side-effects on the builder. The return type is only a builder to enable method chaining. In fact, `frameworkInfo` can be declared `final`. ``` if (webUiUrl != null) { frameworkInfo.setWebuiUrl(webUiUrl); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos web ui url is missing. > > > Key: FLINK-10101 > URL: https://issues.apache.org/jira/browse/FLINK-10101 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.5.0, 1.5.1, 1.5.2 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Mesos web ui url is missing in new deploy mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos.
GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#discussion_r210037022 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -237,6 +244,10 @@ protected void initialize() throws ResourceManagerException { Protos.FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo() .clone() .setCheckpoint(true); + if (webUiUrl != null) { + frameworkInfo = frameworkInfo.setWebuiUrl(webUiUrl); Review comment: It is not needed to reassign `frameworkInfo` because the method has side-effects on the builder. The return type is only a builder to enable method chaining. In fact, `frameworkInfo` can be declared `final`. ``` if (webUiUrl != null) { frameworkInfo.setWebuiUrl(webUiUrl); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos.
GJL commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#discussion_r210036352 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -185,6 +190,8 @@ public MesosResourceManager( this.workersInNew = new HashMap<>(8); this.workersInLaunch = new HashMap<>(8); this.workersBeingReturned = new HashMap<>(8); + + this.webUiUrl = webUiUrl; Review comment: I would move this line below this assignment: ``` this.taskManagerContainerSpec = Preconditions.checkNotNull(taskManagerContainerSpec); ``` so that the assignments of all constructor arguments are grouped. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10019) Fix Composite getResultType of UDF cannot be chained with other operators
[ https://issues.apache.org/jira/browse/FLINK-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578793#comment-16578793 ] Rong Rong edited comment on FLINK-10019 at 8/14/18 5:39 PM: Dug a little deeper in Calcite and found that in: https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68 Seems like if the return type is inferred as a record, or *{{.isStruct() == true}}*, "it must have the same number of fields as the number of operands." which clearly is not the case here since the following expression: *{{AS(func(a), "myRow")}}* only passes over the *{{func(a)}}* for type inference, but not the alias *{{"myRow"}}* was (Author: walterddr): Dug a little deeper in Calcite and found that in: https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/InferTypes.java#L68 Seems like if the return type is inferred as a record, or {{isStruct()}}, "it must have the same number of fields as the number of operands." which clearly is not the case here since the following expression: **{{AS(func(a), "myRow")}}** only passes over the **{{func(a)}}** for type inference, but not the alias **{{"myRow"}}** > Fix Composite getResultType of UDF cannot be chained with other operators > - > > Key: FLINK-10019 > URL: https://issues.apache.org/jira/browse/FLINK-10019 > Project: Flink > Issue Type: Bug >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > If explicitly return a CompositeType in {{udf.getResultType}}, will result in > some failures in chained operators. > For example: consider a simple UDF, > {code:scala} > object Func extends ScalarFunction { > def eval(row: Row): Row = { > row > } > override def getParameterTypes(signature: Array[Class[_]]): > Array[TypeInformation[_]] = > Array(Types.ROW(Types.INT)) > override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = > Types.ROW(Types.INT) > } > {code} > This should work perfectly since it's just a simple pass through, however > {code:scala} > @Test > def testRowType(): Unit = { > val data = List( > Row.of(Row.of(12.asInstanceOf[Integer]), "1") > ) > val env = StreamExecutionEnvironment.getExecutionEnvironment > val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), > Types.STRING)) > val tEnv = TableEnvironment.getTableEnvironment(env) > val table = stream.toTable(tEnv, 'a, 'b) > tEnv.registerFunction("func", Func) > tEnv.registerTable("t", table) > // This works perfectly > val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row] > result1.addSink(new StreamITCase.StringSink[Row]) > // This throws exception > val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM > t").toAppendStream[Row] > result2.addSink(new StreamITCase.StringSink[Row]) > env.execute() > } > {code} > Exception code: > {code:java} > java.lang.IndexOutOfBoundsException: index (1) must be less than size (1) > at > com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310) > at > com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293) > at > com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41) > at > org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349) > ... > {code} > This is due to the fact that Calcite inferOperandTypes does not expect to > infer a struct RelDataType. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers.
GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers. URL: https://github.com/apache/flink/pull/6550#discussion_r210040202 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -323,9 +323,10 @@ public float getProgress() { } @Override - public void onContainersCompleted(final List list) { + public void onContainersCompleted(final List statuses) { runAsync(() -> { - for (final ContainerStatus containerStatus : list) { + log.debug("YARN ResourceManager reported the following containers completed: {}.", statuses); Review comment: That depends on the `List` implementation. At least `java.util.AbstractCollection` has a meaningful string representation. The List provided by Hadoop 2.8 as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10137) YARN: Log completed Containers
[ https://issues.apache.org/jira/browse/FLINK-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580127#comment-16580127 ] ASF GitHub Bot commented on FLINK-10137: GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers. URL: https://github.com/apache/flink/pull/6550#discussion_r210040202 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -323,9 +323,10 @@ public float getProgress() { } @Override - public void onContainersCompleted(final List list) { + public void onContainersCompleted(final List statuses) { runAsync(() -> { - for (final ContainerStatus containerStatus : list) { + log.debug("YARN ResourceManager reported the following containers completed: {}.", statuses); Review comment: That depends on the `List` implementation. At least `java.util.AbstractCollection` has a meaningful string representation. The List provided by Hadoop 2.8 as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YARN: Log completed Containers > -- > > Key: FLINK-10137 > URL: https://issues.apache.org/jira/browse/FLINK-10137 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, ResourceManager, YARN >Affects Versions: 1.5.2, 1.6.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > > Currently the Flink logs do not reveal why a YARN container completed. > {{YarnResourceManager}} should log the {{ContainerStatus}} when the YARN > ResourceManager reports containers to be completed. > *Acceptance Criteria* > * {{YarnResourceManager#onContainersCompleted(List)}} logs > completed containers. > * {{ResourceManager#closeTaskManagerConnection(ResourceID, Exception)}} > should always log the message in the exception. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10137) YARN: Log completed Containers
[ https://issues.apache.org/jira/browse/FLINK-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580126#comment-16580126 ] ASF GitHub Bot commented on FLINK-10137: GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers. URL: https://github.com/apache/flink/pull/6550#discussion_r210040202 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -323,9 +323,10 @@ public float getProgress() { } @Override - public void onContainersCompleted(final List list) { + public void onContainersCompleted(final List statuses) { runAsync(() -> { - for (final ContainerStatus containerStatus : list) { + log.debug("YARN ResourceManager reported the following containers completed: {}.", statuses); Review comment: That depends on the `List` implementation. At least `java.util.AbstractCollection` has a meaningful string representation. The List returned by Hadoop 2.8 as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YARN: Log completed Containers > -- > > Key: FLINK-10137 > URL: https://issues.apache.org/jira/browse/FLINK-10137 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, ResourceManager, YARN >Affects Versions: 1.5.2, 1.6.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > > Currently the Flink logs do not reveal why a YARN container completed. > {{YarnResourceManager}} should log the {{ContainerStatus}} when the YARN > ResourceManager reports containers to be completed. > *Acceptance Criteria* > * {{YarnResourceManager#onContainersCompleted(List)}} logs > completed containers. > * {{ResourceManager#closeTaskManagerConnection(ResourceID, Exception)}} > should always log the message in the exception. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers.
GJL commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers. URL: https://github.com/apache/flink/pull/6550#discussion_r210040202 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -323,9 +323,10 @@ public float getProgress() { } @Override - public void onContainersCompleted(final List list) { + public void onContainersCompleted(final List statuses) { runAsync(() -> { - for (final ContainerStatus containerStatus : list) { + log.debug("YARN ResourceManager reported the following containers completed: {}.", statuses); Review comment: That depends on the `List` implementation. At least `java.util.AbstractCollection` has a meaningful string representation. The List returned by Hadoop 2.8 as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10145) Add replace supported in TableAPI and SQL
Guibo Pan created FLINK-10145: - Summary: Add replace supported in TableAPI and SQL Key: FLINK-10145 URL: https://issues.apache.org/jira/browse/FLINK-10145 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Guibo Pan replace is an useful function for String. for example: {code:java} select replace("Hello World", "World", "Flink") // return "Hello Flink" select replace("ababab", "abab", "z") // return "zab" {code} It is supported as a UDF in Hive, more details please see[1] [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-StringFunctions -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9101) HAQueryableStateRocksDBBackendITCase failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9101: - Fix Version/s: (was: 1.6.0) 1.7.0 1.6.1 > HAQueryableStateRocksDBBackendITCase failed on travis > - > > Key: FLINK-9101 > URL: https://issues.apache.org/jira/browse/FLINK-9101 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Labels: test-stability > Fix For: 1.6.1, 1.7.0 > > > The test deadlocks on travis. > https://travis-ci.org/zentol/flink/jobs/358894950 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9101) HAQueryableStateRocksDBBackendITCase failed on travis
[ https://issues.apache.org/jira/browse/FLINK-9101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-9101: -- It failed again: https://api.travis-ci.org/v3/job/415961618/log.txt > HAQueryableStateRocksDBBackendITCase failed on travis > - > > Key: FLINK-9101 > URL: https://issues.apache.org/jira/browse/FLINK-9101 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Labels: test-stability > Fix For: 1.6.1, 1.7.0 > > > The test deadlocks on travis. > https://travis-ci.org/zentol/flink/jobs/358894950 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10137) YARN: Log completed Containers
[ https://issues.apache.org/jira/browse/FLINK-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580017#comment-16580017 ] ASF GitHub Bot commented on FLINK-10137: yanghua commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers. URL: https://github.com/apache/flink/pull/6550#discussion_r210015723 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -323,9 +323,10 @@ public float getProgress() { } @Override - public void onContainersCompleted(final List list) { + public void onContainersCompleted(final List statuses) { runAsync(() -> { - for (final ContainerStatus containerStatus : list) { + log.debug("YARN ResourceManager reported the following containers completed: {}.", statuses); Review comment: here will call the list's toString method? Is the format suitable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YARN: Log completed Containers > -- > > Key: FLINK-10137 > URL: https://issues.apache.org/jira/browse/FLINK-10137 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, ResourceManager, YARN >Affects Versions: 1.5.2, 1.6.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > > Currently the Flink logs do not reveal why a YARN container completed. > {{YarnResourceManager}} should log the {{ContainerStatus}} when the YARN > ResourceManager reports containers to be completed. > *Acceptance Criteria* > * {{YarnResourceManager#onContainersCompleted(List)}} logs > completed containers. > * {{ResourceManager#closeTaskManagerConnection(ResourceID, Exception)}} > should always log the message in the exception. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers.
yanghua commented on a change in pull request #6550: [FLINK-10137][YARN] Log completed containers. URL: https://github.com/apache/flink/pull/6550#discussion_r210015723 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -323,9 +323,10 @@ public float getProgress() { } @Override - public void onContainersCompleted(final List list) { + public void onContainersCompleted(final List statuses) { runAsync(() -> { - for (final ContainerStatus containerStatus : list) { + log.debug("YARN ResourceManager reported the following containers completed: {}.", statuses); Review comment: here will call the list's toString method? Is the format suitable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"
[ https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580010#comment-16580010 ] ASF GitHub Bot commented on FLINK-10144: yanghua commented on issue #6554: [FLINK-10144]delete unused import akka.actor.ActorRef in TaskManagerMessages.scala URL: https://github.com/apache/flink/pull/6554#issuecomment-412928276 the change is OK, but is it worth to open a PR to do this little fix? cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > No unused import "import akka.actor.ActorRef" in class > "org.apache.flink.runtime.messages.TaskManagerMessages" > -- > > Key: FLINK-10144 > URL: https://issues.apache.org/jira/browse/FLINK-10144 > Project: Flink > Issue Type: Improvement >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > > > {code:java} > package org.apache.flink.runtime.messages > import java.util.UUID > import akka.actor.ActorRef > import org.apache.flink.runtime.accumulators.AccumulatorSnapshot > import org.apache.flink.runtime.instance.InstanceID > /** > * Miscellaneous actor messages exchanged with the TaskManager. > */ > object TaskManagerMessages { > /./ > } > {code} > > The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we > can delete it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6554: [FLINK-10144]delete unused import akka.actor.ActorRef in TaskManagerMessages.scala
yanghua commented on issue #6554: [FLINK-10144]delete unused import akka.actor.ActorRef in TaskManagerMessages.scala URL: https://github.com/apache/flink/pull/6554#issuecomment-412928276 the change is OK, but is it worth to open a PR to do this little fix? cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580001#comment-16580001 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412926404 cc @pnowojski updated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412926404 cc @pnowojski updated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579950#comment-16579950 ] Jonathan Miles commented on FLINK-9061: --- Is the PR still being considered for merging? We seem to be running into the throttling issue with S3 and Flink 1.4.2 and a large stateful job. I'll come back with more information. I only just started the investigation and came across this ticket. > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-8577: --- Description: Api will looks like: {code:java} DataStream[(String, Long, Int)] input = ??? // upsert with keyTable table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) // upsert without key -> single row tableTable table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} A simple design [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] about this subtask. was: Api will looks like: {code:java} DataStream[(String, Long, Int)] input = ??? // upsert with keyTable table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) // upsert without key -> single row tableTable table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579899#comment-16579899 ] Hequn Cheng edited comment on FLINK-8545 at 8/14/18 3:20 PM: - [~pnowojski] Great to have your response. The {{UpsertStreamTable}} and {{AppendStreamTable}} are internal classes which are used during {{registerTableInternal}}. Flag is a good choice, but it is difficult to handle the DataStream type. When upsert from stream, the input type is always tuple type(see FLINK-8577). To solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream with type of tuple2. Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} and it may be translated into an {{UpsertWithRetractNode}} by the current RetractionRules. Thanks again for your suggestions. I have posted a simple design doc just now in [FLINK-8577|https://issues.apache.org/jira/browse/FLINK-8577]. was (Author: hequn8128): [~pnowojski] Great to have your response. The {{UpsertStreamTable}} and {{AppendStreamTable}} are internal classes which are used during {{registerTableInternal}}. Flag is a good choice, but it is difficult to handle the DataStream type. When upsert from stream, the input type is always tuple type(see FLINK-8577). To solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream with type of tuple2. Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} and it may be translated into an {{UpsertWithRetractNode}} by the current RetractionRules. Thanks again for your suggestions. I will post a design doc ASAP. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579945#comment-16579945 ] Hequn Cheng commented on FLINK-8577: A simple design [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] about this subtask. Any suggestions are welcomed! > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579907#comment-16579907 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-412900643 One possible bad news. I have run benchmarks as defined in https://github.com/dataArtisans/flink-benchmarks on this branch and quite a lot of them have shown performance regression. The worst was `StreamNetworkThroughputBenchmarkExecutor` with `1,100ms` params - regression ~18%. Could you run those benchmarks locally and confirm if that's the case or not? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#issuecomment-412900643 One possible bad news. I have run benchmarks as defined in https://github.com/dataArtisans/flink-benchmarks on this branch and quite a lot of them have shown performance regression. The worst was `StreamNetworkThroughputBenchmarkExecutor` with `1,100ms` params - regression ~18%. Could you run those benchmarks locally and confirm if that's the case or not? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579899#comment-16579899 ] Hequn Cheng commented on FLINK-8545: [~pnowojski] Great to have your response. The {{UpsertStreamTable}} and {{AppendStreamTable}} are internal classes which are used during {{registerTableInternal}}. Flag is a good choice, but it is difficult to handle the DataStream type. When upsert from stream, the input type is always tuple type(see FLINK-8577). To solve the type problem, I create the {{UpsertStreamTable}} to ingest datastream with type of tuple2. Considering {{UpsertToRetractOperator}}, I think it should be an {{UpsertNode}} and it may be translated into an {{UpsertWithRetractNode}} by the current RetractionRules. Thanks again for your suggestions. I will post a design doc ASAP. > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9831) Too many open files for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579888#comment-16579888 ] Sayat Satybaldiyev commented on FLINK-9831: --- I've setup PredefinedOptions.SPINNING_DISK_OPTIMIZED for flink RocksDBBacked and I didn't get this exception then. Thought, quite interesting that default RocksDB options don't use OS limit settings. > Too many open files for RocksDB > --- > > Key: FLINK-9831 > URL: https://issues.apache.org/jira/browse/FLINK-9831 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sayat Satybaldiyev >Priority: Major > Attachments: flink_open_files.txt > > > While running only one Flink job, which is backed by RocksDB with > checkpoining to HDFS we encounter an exception that TM cannot access the SST > file because the process has too many open files. However, we have already > increased the file soft/hard limit on the machine. > Number open files for TM on the machine: > > {code:java} > lsof -p 23301|wc -l > 8241{code} > > Instance limits > > {code:java} > ulimit -a > core file size (blocks, -c) 0 > data seg size (kbytes, -d) unlimited > scheduling priority (-e) 0 > file size (blocks, -f) unlimited > pending signals (-i) 256726 > max locked memory (kbytes, -l) 64 > max memory size (kbytes, -m) unlimited > open files (-n) 1048576 > pipe size (512 bytes, -p) 8 > POSIX message queues (bytes, -q) 819200 > real-time priority (-r) 0 > stack size (kbytes, -s) 8192 > cpu time (seconds, -t) unlimited > max user processes (-u) 128000 > virtual memory (kbytes, -v) unlimited > file locks (-x) unlimited > > {code} > > [^flink_open_files.txt] > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1_(1/1) from any of the > 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132) > ... 5 more > Caused by: java.io.FileNotFoundException: > /tmp/flink-io-3da06c9e-f619-44c9-b95f-54ee9b1a084a/job_b3ecbdc0eb2dc2dfbf5532ec1fcef9da_op_KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1__1_1__uuid_c4b82a7e-8a04-4704-9e0b-393c3243cef2/3701639a-bacd-4861-99d8-5f3d112e88d6/16.sst > (Too many open files) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at > org.apache.flink.core.fs.local.LocalDataOutputStream.(LocalDataOutputStream.java:47) > at > org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:275) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1008) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:973) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732) > at >
[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector
[ https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579867#comment-16579867 ] ASF GitHub Bot commented on FLINK-9899: --- glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connector] Add comprehensive per-shard metrics to ShardConsumer URL: https://github.com/apache/flink/pull/6409#issuecomment-412888671 @zentol @tzulitai Can you take a look at this and merge if it looks good? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add more metrics to the Kinesis source connector > > > Key: FLINK-9899 > URL: https://issues.apache.org/jira/browse/FLINK-9899 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.2, 1.5.1 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: pull-request-available > > Currently there are sparse metrics available for the Kinesis Connector. Using > the > [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java] > add more stats. For example: > - sleepTimeMillis > - maxNumberOfRecordsPerFetch > - numberOfAggregatedRecordsPerFetch > - numberOfDeaggregatedRecordsPerFetch > - bytesPerFetch > - averageRecordSizeBytes > - runLoopTimeNanos > - loopFrequencyHz -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connector] Add comprehensive per-shard metrics to ShardConsumer
glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connector] Add comprehensive per-shard metrics to ShardConsumer URL: https://github.com/apache/flink/pull/6409#issuecomment-412888671 @zentol @tzulitai Can you take a look at this and merge if it looks good? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers
[ https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579855#comment-16579855 ] ASF GitHub Bot commented on FLINK-10022: zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209965405 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java ## @@ -82,6 +82,7 @@ public IOMetricsInfo( this.bytesReadComplete = bytesReadComplete; this.bytesWritten = bytesWritten; this.bytesWrittenComplete = bytesWrittenComplete; + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add metrics for input/output buffers > > > Key: FLINK-10022 > URL: https://issues.apache.org/jira/browse/FLINK-10022 > Project: Flink > Issue Type: Improvement > Components: Metrics, Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, we provide metrics for the input and output queue lengths, records > and bytes. For diagnosing network issues, it would also be nice to know the > number of buffers because we would then know their fill rate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers
[ https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579857#comment-16579857 ] ASF GitHub Bot commented on FLINK-10022: zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209965354 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java ## @@ -179,11 +179,12 @@ public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException { gen.writeObjectFieldStart("metrics"); - Long numBytesIn = this.numBytesInLocal + this.numBytesInRemote; Review comment: revert changes in this file This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add metrics for input/output buffers > > > Key: FLINK-10022 > URL: https://issues.apache.org/jira/browse/FLINK-10022 > Project: Flink > Issue Type: Improvement > Components: Metrics, Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, we provide metrics for the input and output queue lengths, records > and bytes. For diagnosing network issues, it would also be nice to know the > number of buffers because we would then know their fill rate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers
[ https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579856#comment-16579856 ] ASF GitHub Bot commented on FLINK-10022: zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209965189 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ## @@ -48,6 +48,7 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by this.numRecordsInPerSecond = recordsIn.getRate(); this.numRecordsOut = recordsOut.getCount(); this.numRecordsOutPerSecond = recordsOut.getRate(); + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add metrics for input/output buffers > > > Key: FLINK-10022 > URL: https://issues.apache.org/jira/browse/FLINK-10022 > Project: Flink > Issue Type: Improvement > Components: Metrics, Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, we provide metrics for the input and output queue lengths, records > and bytes. For diagnosing network issues, it would also be nice to know the > number of buffers because we would then know their fill rate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209965354 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java ## @@ -179,11 +179,12 @@ public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException { gen.writeObjectFieldStart("metrics"); - Long numBytesIn = this.numBytesInLocal + this.numBytesInRemote; Review comment: revert changes in this file This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209965189 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ## @@ -48,6 +48,7 @@ public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter by this.numRecordsInPerSecond = recordsIn.getRate(); this.numRecordsOut = recordsOut.getCount(); this.numRecordsOutPerSecond = recordsOut.getRate(); + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209965405 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java ## @@ -82,6 +82,7 @@ public IOMetricsInfo( this.bytesReadComplete = bytesReadComplete; this.bytesWritten = bytesWritten; this.bytesWrittenComplete = bytesWrittenComplete; + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"
[ https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579840#comment-16579840 ] chengjie.wu commented on FLINK-10144: - [~Zentol],[~lsy] I have opened a new pull request for this issue. [https://github.com/apache/flink/pull/6554], thanks. > No unused import "import akka.actor.ActorRef" in class > "org.apache.flink.runtime.messages.TaskManagerMessages" > -- > > Key: FLINK-10144 > URL: https://issues.apache.org/jira/browse/FLINK-10144 > Project: Flink > Issue Type: Improvement >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > > > {code:java} > package org.apache.flink.runtime.messages > import java.util.UUID > import akka.actor.ActorRef > import org.apache.flink.runtime.accumulators.AccumulatorSnapshot > import org.apache.flink.runtime.instance.InstanceID > /** > * Miscellaneous actor messages exchanged with the TaskManager. > */ > object TaskManagerMessages { > /./ > } > {code} > > The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we > can delete it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579836#comment-16579836 ] ASF GitHub Bot commented on FLINK-8532: --- Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-412878869 @bowenli86 I agree that test case should be more robust, and I have updated it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Priority: Minor > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-412878869 @bowenli86 I agree that test case should be more robust, and I have updated it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579833#comment-16579833 ] ASF GitHub Bot commented on FLINK-10142: NicoK opened a new pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555 ## What is the purpose of the change When credit-based flow control was introduced, we also added some checks and optimisations for uncommon code paths that make common code paths unnecessarily more expensive, e.g. checking whether a channel was released before forwarding a credit notification to Netty. Such checks would have to be confirmed by the Netty thread anyway and thus only add additional load for something that happens only once (per channel). This PR removes these additional checks. Please note that this PR builds upon #6553. ## Brief change log - from `PartitionRequestClient`: remove checking whether we have already been disposed - from `RemoteInputChannel#notifyCreditAvailable()`: remove check for `isReleased` (checked when the notification arrives at Netty's thread) - from `RemoteInputChannel#notifyBufferAvailable()`: we must check inside `synchronized (bufferQueue)` anyway and there is no race condition with `releaseAllResources()` as previously mentioned there (the previous check also did not really prevent any race if it existed!) ## Verifying this change - covered by existing tests, such as `RemoteInputChannelTest` and in general everything using the network stack - added `RemoteInputChannelTest#testConcurrentNotifyBufferAvailableAndRelease` - check for reduced CPU load (together with #6553) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** (per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10142) Reduce synchronization overhead for credit notifications
[ https://issues.apache.org/jira/browse/FLINK-10142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10142: --- Labels: pull-request-available (was: ) > Reduce synchronization overhead for credit notifications > > > Key: FLINK-10142 > URL: https://issues.apache.org/jira/browse/FLINK-10142 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > When credit-based flow control was introduced, we also added some checks and > optimisations for uncommon code paths that make common code paths > unnecessarily more expensive, e.g. checking whether a channel was released > before forwarding a credit notification to Netty. Such checks would have to > be confirmed by the Netty thread anyway and thus only add additional load for > something that happens only once (per channel). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK opened a new pull request #6555: [FLINK-10142][network] reduce locking around credit notification
NicoK opened a new pull request #6555: [FLINK-10142][network] reduce locking around credit notification URL: https://github.com/apache/flink/pull/6555 ## What is the purpose of the change When credit-based flow control was introduced, we also added some checks and optimisations for uncommon code paths that make common code paths unnecessarily more expensive, e.g. checking whether a channel was released before forwarding a credit notification to Netty. Such checks would have to be confirmed by the Netty thread anyway and thus only add additional load for something that happens only once (per channel). This PR removes these additional checks. Please note that this PR builds upon #6553. ## Brief change log - from `PartitionRequestClient`: remove checking whether we have already been disposed - from `RemoteInputChannel#notifyCreditAvailable()`: remove check for `isReleased` (checked when the notification arrives at Netty's thread) - from `RemoteInputChannel#notifyBufferAvailable()`: we must check inside `synchronized (bufferQueue)` anyway and there is no race condition with `releaseAllResources()` as previously mentioned there (the previous check also did not really prevent any race if it existed!) ## Verifying this change - covered by existing tests, such as `RemoteInputChannelTest` and in general everything using the network stack - added `RemoteInputChannelTest#testConcurrentNotifyBufferAvailableAndRelease` - check for reduced CPU load (together with #6553) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** (per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"
[ https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579809#comment-16579809 ] dalongliu commented on FLINK-10144: --- [~Zentol] can you assign this issue to [~wucj],he want to contribute to Flink,let's give him a opportunity,thanks very much! > No unused import "import akka.actor.ActorRef" in class > "org.apache.flink.runtime.messages.TaskManagerMessages" > -- > > Key: FLINK-10144 > URL: https://issues.apache.org/jira/browse/FLINK-10144 > Project: Flink > Issue Type: Improvement >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > > > {code:java} > package org.apache.flink.runtime.messages > import java.util.UUID > import akka.actor.ActorRef > import org.apache.flink.runtime.accumulators.AccumulatorSnapshot > import org.apache.flink.runtime.instance.InstanceID > /** > * Miscellaneous actor messages exchanged with the TaskManager. > */ > object TaskManagerMessages { > /./ > } > {code} > > The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we > can delete it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"
[ https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579794#comment-16579794 ] ASF GitHub Bot commented on FLINK-10144: wchengjie opened a new pull request #6554: [FLINK-10144]delete unused import akka.actor.ActorRef in TaskManagerMessages.scala URL: https://github.com/apache/flink/pull/6554 ## What is the purpose of the change *This pull request aim to delete an useless import `import akka.actor.ActorRef` in org.apache.flink.runtime.messages.TaskManagerMessages.scala* ## Brief change log *delete unused `import akka.actor.ActorRef` in `org.apache.flink.runtime.messages.TaskManagerMessages.scala`* ## Verifying this change This change is already covered by existing tests, such as *org.apache.flink.test.recovery.TaskManagerFailureRecoveryITCase.testRestartWithFailingTaskManager()*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > No unused import "import akka.actor.ActorRef" in class > "org.apache.flink.runtime.messages.TaskManagerMessages" > -- > > Key: FLINK-10144 > URL: https://issues.apache.org/jira/browse/FLINK-10144 > Project: Flink > Issue Type: Improvement >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > > > {code:java} > package org.apache.flink.runtime.messages > import java.util.UUID > import akka.actor.ActorRef > import org.apache.flink.runtime.accumulators.AccumulatorSnapshot > import org.apache.flink.runtime.instance.InstanceID > /** > * Miscellaneous actor messages exchanged with the TaskManager. > */ > object TaskManagerMessages { > /./ > } > {code} > > The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we > can delete it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"
[ https://issues.apache.org/jira/browse/FLINK-10144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10144: --- Labels: pull-request-available (was: ) > No unused import "import akka.actor.ActorRef" in class > "org.apache.flink.runtime.messages.TaskManagerMessages" > -- > > Key: FLINK-10144 > URL: https://issues.apache.org/jira/browse/FLINK-10144 > Project: Flink > Issue Type: Improvement >Reporter: dalongliu >Assignee: dalongliu >Priority: Major > Labels: pull-request-available > > > {code:java} > package org.apache.flink.runtime.messages > import java.util.UUID > import akka.actor.ActorRef > import org.apache.flink.runtime.accumulators.AccumulatorSnapshot > import org.apache.flink.runtime.instance.InstanceID > /** > * Miscellaneous actor messages exchanged with the TaskManager. > */ > object TaskManagerMessages { > /./ > } > {code} > > The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we > can delete it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] wchengjie opened a new pull request #6554: [FLINK-10144]delete unused import akka.actor.ActorRef in TaskManagerMessages.scala
wchengjie opened a new pull request #6554: [FLINK-10144]delete unused import akka.actor.ActorRef in TaskManagerMessages.scala URL: https://github.com/apache/flink/pull/6554 ## What is the purpose of the change *This pull request aim to delete an useless import `import akka.actor.ActorRef` in org.apache.flink.runtime.messages.TaskManagerMessages.scala* ## Brief change log *delete unused `import akka.actor.ActorRef` in `org.apache.flink.runtime.messages.TaskManagerMessages.scala`* ## Verifying this change This change is already covered by existing tests, such as *org.apache.flink.test.recovery.TaskManagerFailureRecoveryITCase.testRestartWithFailingTaskManager()*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579783#comment-16579783 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski edited a comment on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964 Yes exactly. More precisely as I wrote in one of the previous comments, this `TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from those methods: - open - writeRecord/invoke - toString - close (those probably should be dropped because of NPE.) Because their logic is identical between `PrintSinkFunction` and `PrintingOutputFormat` while their implementation is already inconsistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579782#comment-16579782 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964 Yes exactly. More precisely as I wrote in one of the previous comments, this `TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from those methods: - open - writeRecord/invoke - toString - close (those probably should be dropped because of NPE.) Because their logic is identical between `PrintSinkFunction` and `PrintingOutputFormat` while their implementation is already inconsistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski edited a comment on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski edited a comment on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964 Yes exactly. More precisely as I wrote in one of the previous comments, this `TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from those methods: - open - writeRecord/invoke - toString - close (those probably should be dropped because of NPE.) Because their logic is identical between `PrintSinkFunction` and `PrintingOutputFormat` while their implementation is already inconsistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964 Yes exactly. More precisely as I wrote in one of the previous comments, this `TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from those methods: - open - writeRecord/invoke - toString - close (those probably should be dropped because of NPE.) Because their logic is identical between `PrintSinkFunction` and `PrintingOutputFormat` while their implementation is already inconsistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10119) JsonRowDeserializationSchema deserialize kafka message
[ https://issues.apache.org/jira/browse/FLINK-10119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579779#comment-16579779 ] buptljy commented on FLINK-10119: - [~twalthr] Okay, I am going to add two optional properties. 1. ignore-when-error: ignore the error line if this is configured to be true. 2. additinal-error-field: add error messages to the additional row and the others are null. > JsonRowDeserializationSchema deserialize kafka message > -- > > Key: FLINK-10119 > URL: https://issues.apache.org/jira/browse/FLINK-10119 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.1 > Environment: 无 >Reporter: sean.miao >Assignee: buptljy >Priority: Major > > Recently, we are using Kafka010JsonTableSource to process kafka's json > messages.We turned on checkpoint and auto-restart strategy . > We found that as long as the format of a message is not json, it will cause > the job to not be pulled up. Of course, this is to ensure that only once > processing or at least once processing, but the resulting application is not > available and has a greater impact on us. > the code is : > class : JsonRowDeserializationSchema > function : > @Override > public Row deserialize(byte[] message) throws IOException { > try > { final JsonNode root = objectMapper.readTree(message); return > convertRow(root, (RowTypeInfo) typeInfo); } > catch (Throwable t) > { throw new IOException("Failed to deserialize JSON object.", t); } > } > now ,i change it to : > public Row deserialize(byte[] message) throws IOException { > try > { JsonNode root = this.objectMapper.readTree(message); return > this.convertRow(root, (RowTypeInfo)this.typeInfo); } > catch (Throwable var4) { > message = this.objectMapper.writeValueAsBytes("{}"); > JsonNode root = this.objectMapper.readTree(message); > return this.convertRow(root, (RowTypeInfo)this.typeInfo); > } > } > > I think that data format errors are inevitable during network transmission, > so can we add a new column to the table for the wrong data format? like spark > sql does。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers
[ https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579760#comment-16579760 ] ASF GitHub Bot commented on FLINK-10022: NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209937737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ## @@ -43,38 +43,74 @@ protected double numBytesInRemotePerSecond; protected double numBytesOutPerSecond; - public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { + protected long numBuffersInLocal; + protected long numBuffersInRemote; + protected long numBuffersOut; + + protected double numBuffersInLocalPerSecond; + protected double numBuffersInRemotePerSecond; + protected double numBuffersOutPerSecond; + + public IOMetrics( Review comment: that's what I meant - done...hopefully ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add metrics for input/output buffers > > > Key: FLINK-10022 > URL: https://issues.apache.org/jira/browse/FLINK-10022 > Project: Flink > Issue Type: Improvement > Components: Metrics, Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, we provide metrics for the input and output queue lengths, records > and bytes. For diagnosing network issues, it would also be nice to know the > number of buffers because we would then know their fill rate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209937737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ## @@ -43,38 +43,74 @@ protected double numBytesInRemotePerSecond; protected double numBytesOutPerSecond; - public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { + protected long numBuffersInLocal; + protected long numBuffersInRemote; + protected long numBuffersOut; + + protected double numBuffersInLocalPerSecond; + protected double numBuffersInRemotePerSecond; + protected double numBuffersOutPerSecond; + + public IOMetrics( Review comment: that's what I meant - done...hopefully ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers
[ https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579758#comment-16579758 ] ASF GitHub Bot commented on FLINK-10022: NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209937737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ## @@ -43,38 +43,74 @@ protected double numBytesInRemotePerSecond; protected double numBytesOutPerSecond; - public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { + protected long numBuffersInLocal; + protected long numBuffersInRemote; + protected long numBuffersOut; + + protected double numBuffersInLocalPerSecond; + protected double numBuffersInRemotePerSecond; + protected double numBuffersOutPerSecond; + + public IOMetrics( Review comment: that's what I meant - done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add metrics for input/output buffers > > > Key: FLINK-10022 > URL: https://issues.apache.org/jira/browse/FLINK-10022 > Project: Flink > Issue Type: Improvement > Components: Metrics, Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, we provide metrics for the input and output queue lengths, records > and bytes. For diagnosing network issues, it would also be nice to know the > number of buffers because we would then know their fill rate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209937737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ## @@ -43,38 +43,74 @@ protected double numBytesInRemotePerSecond; protected double numBytesOutPerSecond; - public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { + protected long numBuffersInLocal; + protected long numBuffersInRemote; + protected long numBuffersOut; + + protected double numBuffersInLocalPerSecond; + protected double numBuffersInRemotePerSecond; + protected double numBuffersOutPerSecond; + + public IOMetrics( Review comment: that's what I meant - done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10144) No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages"
dalongliu created FLINK-10144: - Summary: No unused import "import akka.actor.ActorRef" in class "org.apache.flink.runtime.messages.TaskManagerMessages" Key: FLINK-10144 URL: https://issues.apache.org/jira/browse/FLINK-10144 Project: Flink Issue Type: Improvement Reporter: dalongliu Assignee: dalongliu {code:java} package org.apache.flink.runtime.messages import java.util.UUID import akka.actor.ActorRef import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.instance.InstanceID /** * Miscellaneous actor messages exchanged with the TaskManager. */ object TaskManagerMessages { /./ } {code} The import "akka.actor.ActorRef" in TaskManagerMessages is never used,so we can delete it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
[ https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579743#comment-16579743 ] ASF GitHub Bot commented on FLINK-8290: --- azagrebin commented on issue #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#issuecomment-412856931 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify clientId to groupId in flink-connector-kafka-0.8 > --- > > Key: FLINK-8290 > URL: https://issues.apache.org/jira/browse/FLINK-8290 > Project: Flink > Issue Type: Improvement >Reporter: xymaqingxiang >Assignee: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Now the Clientid that consumes the all topics are > constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy > for us to look at kafka's log, so I recommend that it be modified to groupid. > We can modify the SimpleConsumerThread.java file, as shown below: > {code:java} > private final String clientId; > ... > this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" > + broker.id()); > ... > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on issue #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
azagrebin commented on issue #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#issuecomment-412856931 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10141) Reduce lock contention introduced with 1.5
[ https://issues.apache.org/jira/browse/FLINK-10141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10141: --- Labels: pull-request-available (was: ) > Reduce lock contention introduced with 1.5 > -- > > Key: FLINK-10141 > URL: https://issues.apache.org/jira/browse/FLINK-10141 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the changes around introducing credit-based flow control as well as the > low latency changes, unfortunately, we also introduced some lock contention > on {{RemoteInputChannel#bufferQueue}} and > {{RemoteInputChannel#receivedBuffers}} as well as asking for queue sizes when > the only thing we need is whether it is empty or not. > This was observed as a high idle CPU load with no events in the stream but > only watermarks (every 500ms) and many slots on a single machine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10141) Reduce lock contention introduced with 1.5
[ https://issues.apache.org/jira/browse/FLINK-10141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579739#comment-16579739 ] ASF GitHub Bot commented on FLINK-10141: NicoK opened a new pull request #6553: [FLINK-10141][network] optimisations reducing lock contention URL: https://github.com/apache/flink/pull/6553 ## What is the purpose of the change With the changes around introducing credit-based flow control as well as the low latency changes, unfortunately, we also introduced some lock contention on `RemoteInputChannel#bufferQueue` and `RemoteInputChannel#receivedBuffers`. Additionally, we were asking for queue sizes when the only thing we need is whether it is empty or not. As a result, we saw high CPU load during "idle" stream processing jobs with no events in the stream but only watermarks (every 500ms) and many slots on a single machine. ## Brief change log - move `notifyCreditAvailable()` out of the lock around `RemoteInputChannel#bufferQueue` - move `notifyChannelNonEmpty()` out of the lock around `RemoteInputChannel#receivedBuffers` - replace `RemoteInputChannel#receivedBuffers.size()` with `receivedBuffers.isEmpty()` when this is the only thing needed - replace `SingleInputGate#inputChannelsWithData.size()` with `inputChannelsWithData.isEmpty()` when this is the only thing needed - minor code style improvement in `CreditBasedPartitionRequestClientHandler` to improve readability ## Verifying this change - This change is already covered by existing tests, such as `RemoteInputChannelTest`, `SingleInputGateTest`,..., and everything using the network stack. - Manually verified that less CPU is used with a `SocketWindowWordCount` with some added shuffles, no input elements, only watermarks every 500ms with 4 TMs, 10 slots each on a single (laptop) machine. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** (per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce lock contention introduced with 1.5 > -- > > Key: FLINK-10141 > URL: https://issues.apache.org/jira/browse/FLINK-10141 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > With the changes around introducing credit-based flow control as well as the > low latency changes, unfortunately, we also introduced some lock contention > on {{RemoteInputChannel#bufferQueue}} and > {{RemoteInputChannel#receivedBuffers}} as well as asking for queue sizes when > the only thing we need is whether it is empty or not. > This was observed as a high idle CPU load with no events in the stream but > only watermarks (every 500ms) and many slots on a single machine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK opened a new pull request #6553: [FLINK-10141][network] optimisations reducing lock contention
NicoK opened a new pull request #6553: [FLINK-10141][network] optimisations reducing lock contention URL: https://github.com/apache/flink/pull/6553 ## What is the purpose of the change With the changes around introducing credit-based flow control as well as the low latency changes, unfortunately, we also introduced some lock contention on `RemoteInputChannel#bufferQueue` and `RemoteInputChannel#receivedBuffers`. Additionally, we were asking for queue sizes when the only thing we need is whether it is empty or not. As a result, we saw high CPU load during "idle" stream processing jobs with no events in the stream but only watermarks (every 500ms) and many slots on a single machine. ## Brief change log - move `notifyCreditAvailable()` out of the lock around `RemoteInputChannel#bufferQueue` - move `notifyChannelNonEmpty()` out of the lock around `RemoteInputChannel#receivedBuffers` - replace `RemoteInputChannel#receivedBuffers.size()` with `receivedBuffers.isEmpty()` when this is the only thing needed - replace `SingleInputGate#inputChannelsWithData.size()` with `inputChannelsWithData.isEmpty()` when this is the only thing needed - minor code style improvement in `CreditBasedPartitionRequestClientHandler` to improve readability ## Verifying this change - This change is already covered by existing tests, such as `RemoteInputChannelTest`, `SingleInputGateTest`,..., and everything using the network stack. - Manually verified that less CPU is used with a `SocketWindowWordCount` with some added shuffles, no input elements, only watermarks every 500ms with 4 TMs, 10 slots each on a single (laptop) machine. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** (per buffer) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers
[ https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579737#comment-16579737 ] ASF GitHub Bot commented on FLINK-10022: zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209932565 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ## @@ -43,38 +43,74 @@ protected double numBytesInRemotePerSecond; protected double numBytesOutPerSecond; - public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { + protected long numBuffersInLocal; + protected long numBuffersInRemote; + protected long numBuffersOut; + + protected double numBuffersInLocalPerSecond; + protected double numBuffersInRemotePerSecond; + protected double numBuffersOutPerSecond; + + public IOMetrics( Review comment: it should be fine to remove them from this class, and revert the changes to the various handlers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add metrics for input/output buffers > > > Key: FLINK-10022 > URL: https://issues.apache.org/jira/browse/FLINK-10022 > Project: Flink > Issue Type: Improvement > Components: Metrics, Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, we provide metrics for the input and output queue lengths, records > and bytes. For diagnosing network issues, it would also be nice to know the > number of buffers because we would then know their fill rate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
zentol commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209932565 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ## @@ -43,38 +43,74 @@ protected double numBytesInRemotePerSecond; protected double numBytesOutPerSecond; - public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { + protected long numBuffersInLocal; + protected long numBuffersInRemote; + protected long numBuffersOut; + + protected double numBuffersInLocalPerSecond; + protected double numBuffersInRemotePerSecond; + protected double numBuffersOutPerSecond; + + public IOMetrics( Review comment: it should be fine to remove them from this class, and revert the changes to the various handlers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579732#comment-16579732 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412854540 @pnowojski fixed some issues you just mentioned, will refactor `PrintSinkFunction ` and `PrintingOutputFormat `, you mean I should create a new class named `TaskOutputWriter ` and extract the same logic code? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412854540 @pnowojski fixed some issues you just mentioned, will refactor `PrintSinkFunction ` and `PrintingOutputFormat `, you mean I should create a new class named `TaskOutputWriter ` and extract the same logic code? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-6810) Add a set of built-in scalar functions to Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-6810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-6810: Description: In this JIRA, we will create some sub-tasks for adding specific scalar functions such as mathematical-function {{LOG}}, date-functions {{DATEADD}}, string-functions {{LPAD}}, etc. *How to contribute a built-in scalar function* Thank you very much for contributing a built-in function. In order to make sure your contributions are in a good direction, it is recommended to read the following instructions. # Investigate the behavior of the function that you are going to contribute in major DBMSs. This is very important since we have to understand the exact semantics of the function. # It is recommended to add function for both SQL and table-API (Java and Scala). # For every scalar function, add corresponding docs which should include a SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make sure your description of the function is accurate. Please do not simply copy documentation from other projects, especially if the projects are not Apache licensed. # Take overflow, NullPointerException and other exceptions into consideration. # Add unit tests for every new function and its supported APIs. Have a look at {{ScalarFunctionsTest}}, {{SqlExpressionTest}}, {{ScalaFunctionsValidationTest}}, etc. for how to implement function tests. !how to add a scalar function.png! Welcome anybody to add the sub-task about standard database scalar function. Note: Usually, adding a runtime function to {{ScalarFunctions.scala}} is sufficient. However, sometimes it makes sense to implement a {{CallGenerator}} for {{FunctionGenerator}} to leverage object reuse. E.g., {{HashCalcCallGen}} creates a {{MessageDigest}} only once for {{SHA2}} if the parameters are literals. was: In this JIRA, we will create some sub-tasks for adding specific scalar functions such as mathematical-function {{LOG}}, date-functions {{DATEADD}}, string-functions {{LPAD}}, etc. *How to contribute a built-in scalar function* Thank you very much for contributing a built-in function. In order to make sure your contributions are in a good direction, it is recommended to read the following instructions. # Investigate the behavior of the function that you are going to contribute in major DBMSs. This is very important since we have to understand the exact semantics of the function. # It is recommended to add function for both SQL and table-API (Java and Scala). # For every scalar function, add corresponding docs which should include a SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make sure your description of the function is accurate. Please do not simply copy documentation from other projects, especially if the projects are not Apache licensed. # Take overflow, NullPointerException and other exceptions into consideration. # Add unit tests for every new function and its supported APIs. Have a look at {{ScalarFunctionsTest}}, {{SqlExpressionTest}}, {{ScalaFunctionsValidationTest}}, etc. for how to implement function tests. !how to add a scalar function.png! Welcome anybody to add the sub-task about standard database scalar function. Note: Usually, adding a runtime function to {{ScalarFunctions.scala}} is sufficient. However, sometimes it makes sense to implement a {{CallGenerator}} for {{FunctionGenerator}} to leverage object reuse. E.g., {{HashCalcCallGen}} creates a {{MessageDigest}} only once if the parameters are literals. > Add a set of built-in scalar functions to Table API & SQL > - > > Key: FLINK-6810 > URL: https://issues.apache.org/jira/browse/FLINK-6810 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: starter > Attachments: how to add a scalar function.png > > > In this JIRA, we will create some sub-tasks for adding specific scalar > functions such as mathematical-function {{LOG}}, date-functions > {{DATEADD}}, string-functions {{LPAD}}, etc. > *How to contribute a built-in scalar function* > Thank you very much for contributing a built-in function. In order to make > sure your contributions are in a good direction, it is recommended to read > the following instructions. > # Investigate the behavior of the function that you are going to contribute > in major DBMSs. This is very important since we have to understand the exact > semantics of the function. > # It is recommended to add function for both SQL and table-API (Java and > Scala). > # For every scalar function, add corresponding docs which should include a > SQL, a Java and a Scala version in
[jira] [Updated] (FLINK-6810) Add a set of built-in scalar functions to Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-6810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-6810: Description: In this JIRA, we will create some sub-tasks for adding specific scalar functions such as mathematical-function {{LOG}}, date-functions {{DATEADD}}, string-functions {{LPAD}}, etc. *How to contribute a built-in scalar function* Thank you very much for contributing a built-in function. In order to make sure your contributions are in a good direction, it is recommended to read the following instructions. # Investigate the behavior of the function that you are going to contribute in major DBMSs. This is very important since we have to understand the exact semantics of the function. # It is recommended to add function for both SQL and table-API (Java and Scala). # For every scalar function, add corresponding docs which should include a SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make sure your description of the function is accurate. Please do not simply copy documentation from other projects, especially if the projects are not Apache licensed. # Take overflow, NullPointerException and other exceptions into consideration. # Add unit tests for every new function and its supported APIs. Have a look at {{ScalarFunctionsTest}}, {{SqlExpressionTest}}, {{ScalaFunctionsValidationTest}}, etc. for how to implement function tests. !how to add a scalar function.png! Welcome anybody to add the sub-task about standard database scalar function. Note: Usually, adding a runtime function to {{ScalarFunctions.scala}} is sufficient. However, sometimes it makes sense to implement a {{CallGenerator}} for {{FunctionGenerator}} to leverage object reuse. E.g., {{HashCalcCallGen}} creates a {{MessageDigest}} only once if the parameters are literals. was: In this JIRA, we will create some sub-tasks for adding specific scalar functions such as mathematical-function {{LOG}}, date-functions {{DATEADD}}, string-functions {{LPAD}}, etc. *How to contribute a built-in scalar function* Thank you very much for contributing a built-in function. In order to make sure your contributions are in a good direction, it is recommended to read the following instructions. # Investigate the behavior of the function that you are going to contribute in major DBMSs. This is very important since we have to understand the exact semantics of the function. # It is recommended to add function for both SQL and table-API (Java and Scala). # For every scalar function, add corresponding docs which should include a SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make sure your description of the function is accurate. Please do not simply copy documentation from other projects, especially if the projects are not Apache licensed. # Take overflow, NullPointerException and other exceptions into consideration. # Add unit tests for every new function and its supported APIs. Have a look at {{ScalarFunctionsTest}}, {{SqlExpressionTest}}, {{ScalaFunctionsValidationTest}}, etc. for how to implement function tests. !how to add a scalar function.png! Welcome anybody to add the sub-task about standard database scalar function. > Add a set of built-in scalar functions to Table API & SQL > - > > Key: FLINK-6810 > URL: https://issues.apache.org/jira/browse/FLINK-6810 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: starter > Attachments: how to add a scalar function.png > > > In this JIRA, we will create some sub-tasks for adding specific scalar > functions such as mathematical-function {{LOG}}, date-functions > {{DATEADD}}, string-functions {{LPAD}}, etc. > *How to contribute a built-in scalar function* > Thank you very much for contributing a built-in function. In order to make > sure your contributions are in a good direction, it is recommended to read > the following instructions. > # Investigate the behavior of the function that you are going to contribute > in major DBMSs. This is very important since we have to understand the exact > semantics of the function. > # It is recommended to add function for both SQL and table-API (Java and > Scala). > # For every scalar function, add corresponding docs which should include a > SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make > sure your description of the function is accurate. Please do not simply copy > documentation from other projects, especially if the projects are not Apache > licensed. > # Take overflow, NullPointerException and other exceptions into > consideration. > # Add unit tests
[jira] [Commented] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579726#comment-16579726 ] Piotr Nowojski commented on FLINK-8545: --- This `AppendStreamTable` would still carry on deletes (`change` flag from `CRow`), so wouldn't it be confusing name? Also do we have to expose in API ({{UpsertStreamTable)}} how are the updates encoded? Shouldn't those be just traits of the underlying stream? I mean that stream created from upsert source, could just have a trait {{primary_key}} and maybe flag that it's {{upsert}}. If such stream is incompatible with required input of some operator, we would place there {{UpsertToRetractOperator}} that would change the upsert stream back into retraction stream. Such operator would need to hold whole table on state (maybe pruning on watermarks), but such construct would be able to support upsert sources in all queries. It would also allow us to easier optimize queries in the future. For example: # Reorder {{UpsertToRetractOperator}} with filters # Reorder {{UpsertToRetractOperator}} with projections # Remove {{UpsertToRetractOperator}} if following operator can handle upserts (like UpsertDataSink, or Joins) > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-6810) Add a set of built-in scalar functions to Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-6810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-6810: Summary: Add a set of built-in scalar functions to Table API & SQL (was: Add Some built-in Scalar Function supported) > Add a set of built-in scalar functions to Table API & SQL > - > > Key: FLINK-6810 > URL: https://issues.apache.org/jira/browse/FLINK-6810 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: starter > Attachments: how to add a scalar function.png > > > In this JIRA, we will create some sub-tasks for adding specific scalar > functions such as mathematical-function {{LOG}}, date-functions > {{DATEADD}}, string-functions {{LPAD}}, etc. > *How to contribute a built-in scalar function* > Thank you very much for contributing a built-in function. In order to make > sure your contributions are in a good direction, it is recommended to read > the following instructions. > # Investigate the behavior of the function that you are going to contribute > in major DBMSs. This is very important since we have to understand the exact > semantics of the function. > # It is recommended to add function for both SQL and table-API (Java and > Scala). > # For every scalar function, add corresponding docs which should include a > SQL, a Java and a Scala version in {{./docs/dev/table/functions.md}}. Make > sure your description of the function is accurate. Please do not simply copy > documentation from other projects, especially if the projects are not Apache > licensed. > # Take overflow, NullPointerException and other exceptions into > consideration. > # Add unit tests for every new function and its supported APIs. Have a look > at {{ScalarFunctionsTest}}, {{SqlExpressionTest}}, > {{ScalaFunctionsValidationTest}}, etc. for how to implement function tests. > !how to add a scalar function.png! > Welcome anybody to add the sub-task about standard database scalar function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10130) How to define two hdfs name-node IPs in flink-conf.yaml file
[ https://issues.apache.org/jira/browse/FLINK-10130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579710#comment-16579710 ] Paul Lin commented on FLINK-10130: -- It could be solved by [HDFS name services|https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html], and I think it might be better to leave it to the HDFS client. FYI. > How to define two hdfs name-node IPs in flink-conf.yaml file > > > Key: FLINK-10130 > URL: https://issues.apache.org/jira/browse/FLINK-10130 > Project: Flink > Issue Type: Bug >Reporter: Keshav Lodhi >Priority: Blocker > Attachments: docker-entrypoints.sh > > > Hi Team, > Here is, what we are looking for: > * We have flink HA dockerized cluster with (3 zookeepers, 2 job-managers, 3 > task-managers). > * We are using HDFS from the flink to store some data. The problem we are > facing is that, we are not able to pass 2 name-node IPs in config. > * These are the config parameters we want to add two name-node IPs > # "state.checkpoints.dir: hdfs://X.X.X.X:9001/flinkdatastorage" > # "state.backend.fs.checkpointdir: > hdfs://X.X.X.X:9001/flinkdatastorage" > # "high-availability.zookeeper.storageDir: > hdfs://X.X.X.X:9001/flink/recovery" > Currently we are passing only one name-node IP > Please advise. > I have attached the sample *docker-entrypoint.sh* file: > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7205) Add UUID supported in TableAPI/SQL
[ https://issues.apache.org/jira/browse/FLINK-7205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579706#comment-16579706 ] ASF GitHub Bot commented on FLINK-7205: --- asfgit closed pull request #6381: [FLINK-7205] [table] Add UUID supported in SQL and TableApi URL: https://github.com/apache/flink/pull/6381 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 148f30727d9..4d2cbbadc0a 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1664,6 +1664,17 @@ RAND_INTEGER(seed integer, bound integer) + + + {% highlight text %} +UUID() +{% endhighlight %} + + + Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' according to RFC 4122. + + + {% highlight text %} diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index b702dddcde5..a78543bd6c3 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -2322,6 +2322,17 @@ randInteger(seed integer, bound integer) + + + {% highlight java %} +UUID() +{% endhighlight %} + + + Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' according to RFC 4122. + + + {% highlight java %} @@ -3910,6 +3921,17 @@ randInteger(seed integer, bound integer) + + + {% highlight scala %} +UUID() +{% endhighlight %} + + + Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' according to RFC 4122. + + + {% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index c7c805f6743..23ac30d3d8b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -1247,4 +1247,17 @@ object concat_ws { } } +/** + * Returns the uuid according to RFC 4122. + */ +object uuid { + + /** +* Returns the uuid according to RFC 4122. +*/ + def apply(): Expression = { +UUID() + } +} + // scalastyle:on object.name diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 74b69d6afcc..16beea9149d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -158,6 +158,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethods.TOBASE64) + addSqlFunction( +UUID, +Seq(), +new UUIDCallGen() + ) + // -- // Arithmetic functions // -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala new file mode 100644 index 000..537a19857c9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen.calls + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO +import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull +import
[GitHub] asfgit closed pull request #6381: [FLINK-7205] [table] Add UUID supported in SQL and TableApi
asfgit closed pull request #6381: [FLINK-7205] [table] Add UUID supported in SQL and TableApi URL: https://github.com/apache/flink/pull/6381 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 148f30727d9..4d2cbbadc0a 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -1664,6 +1664,17 @@ RAND_INTEGER(seed integer, bound integer) + + + {% highlight text %} +UUID() +{% endhighlight %} + + + Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' according to RFC 4122. + + + {% highlight text %} diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index b702dddcde5..a78543bd6c3 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -2322,6 +2322,17 @@ randInteger(seed integer, bound integer) + + + {% highlight java %} +UUID() +{% endhighlight %} + + + Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' according to RFC 4122. + + + {% highlight java %} @@ -3910,6 +3921,17 @@ randInteger(seed integer, bound integer) + + + {% highlight scala %} +UUID() +{% endhighlight %} + + + Returns a uuid string like '3d3c68f7-f608-473f-b60c-b0c44ad4cc4e' according to RFC 4122. + + + {% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index c7c805f6743..23ac30d3d8b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -1247,4 +1247,17 @@ object concat_ws { } } +/** + * Returns the uuid according to RFC 4122. + */ +object uuid { + + /** +* Returns the uuid according to RFC 4122. +*/ + def apply(): Expression = { +UUID() + } +} + // scalastyle:on object.name diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 74b69d6afcc..16beea9149d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -158,6 +158,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethods.TOBASE64) + addSqlFunction( +UUID, +Seq(), +new UUIDCallGen() + ) + // -- // Arithmetic functions // -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala new file mode 100644 index 000..537a19857c9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/UUIDCallGen.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen.calls + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO +import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression} + +/** + * Generates a UUID function call. + */ +class UUIDCallGen extends CallGenerator { + + override def generate(codeGenerator: CodeGenerator, +operands:
[jira] [Resolved] (FLINK-7205) Add UUID supported in TableAPI/SQL
[ https://issues.apache.org/jira/browse/FLINK-7205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-7205. - Resolution: Fixed Fix Version/s: 1.7.0 Fixed in 1.7.0: 913b0413882939c30da4ad4df0cabc84dfe69ea0 > Add UUID supported in TableAPI/SQL > -- > > Key: FLINK-7205 > URL: https://issues.apache.org/jira/browse/FLINK-7205 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: buptljy >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > UUID() returns a value that conforms to UUID version 1 as described in RFC > 4122. The value is a 128-bit number represented as a utf8 string of five > hexadecimal numbers in ---- format: > The first three numbers are generated from the low, middle, and high parts of > a timestamp. The high part also includes the UUID version number. > The fourth number preserves temporal uniqueness in case the timestamp value > loses monotonicity (for example, due to daylight saving time). > The fifth number is an IEEE 802 node number that provides spatial uniqueness. > A random number is substituted if the latter is not available (for example, > because the host device has no Ethernet card, or it is unknown how to find > the hardware address of an interface on the host operating system). In this > case, spatial uniqueness cannot be guaranteed. Nevertheless, a collision > should have very low probability. > See: [RFC 4122: > http://www.ietf.org/rfc/rfc4122.txt|http://www.ietf.org/rfc/rfc4122.txt] > See detailed semantics: >MySql: > [https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid|https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_uuid] > Welcome anybody feedback -:). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config
[ https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579701#comment-16579701 ] ASF GitHub Bot commented on FLINK-9013: --- dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe… URL: https://github.com/apache/flink/pull/6294#discussion_r209925707 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ## @@ -63,9 +65,14 @@ */ public static final ConfigOption VCORES = key("yarn.containers.vcores") - .defaultValue(-1) - .withDescription("The number of virtual cores (vcores) per YARN container. By default, the number of vcores" + - " is set to the number of slots per TaskManager, if set, or to 1, otherwise."); + .defaultValue(-1) + .withDescription(Description.builder().text( + "The number of virtual cores (vcores) per YARN container. By default, the number of vcores" + + " is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this" + + " parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting" + + " the %s.", + code("org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler")) Review comment: @GJL thanks for your comments.The original reason for introducing the DescriptionBuilder was not to embed html or markdown into the description so that we can format it differently in case we want to print it in Exception description. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document yarn.containers.vcores only being effective when adapting YARN config > -- > > Key: FLINK-9013 > URL: https://issues.apache.org/jira/browse/FLINK-9013 > Project: Flink > Issue Type: Improvement > Components: Documentation, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > Even after specifying {{yarn.containers.vcores}} and having Flink request > such a container from YARN, it may not take these into account at all and > return a container with 1 vcore. > The YARN configuration needs to be adapted to take the vcores into account, > e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}: > {code} > > yarn.resourcemanager.scheduler.class > > org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler > > {code} > This fact should be documented at least at the configuration parameter > documentation of {{yarn.containers.vcores}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…
dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe… URL: https://github.com/apache/flink/pull/6294#discussion_r209925707 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ## @@ -63,9 +65,14 @@ */ public static final ConfigOption VCORES = key("yarn.containers.vcores") - .defaultValue(-1) - .withDescription("The number of virtual cores (vcores) per YARN container. By default, the number of vcores" + - " is set to the number of slots per TaskManager, if set, or to 1, otherwise."); + .defaultValue(-1) + .withDescription(Description.builder().text( + "The number of virtual cores (vcores) per YARN container. By default, the number of vcores" + + " is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this" + + " parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting" + + " the %s.", + code("org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler")) Review comment: @GJL thanks for your comments.The original reason for introducing the DescriptionBuilder was not to embed html or markdown into the description so that we can format it differently in case we want to print it in Exception description. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10022) Add metrics for input/output buffers
[ https://issues.apache.org/jira/browse/FLINK-10022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579695#comment-16579695 ] ASF GitHub Bot commented on FLINK-10022: NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209923936 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java ## @@ -104,7 +130,6 @@ public Meter getNumBytesInRemoteRateMeter() { public Meter getNumBytesOutRateMeter() { return numBytesOutRate; } - Review comment: don't know either...reverting that... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add metrics for input/output buffers > > > Key: FLINK-10022 > URL: https://issues.apache.org/jira/browse/FLINK-10022 > Project: Flink > Issue Type: Improvement > Components: Metrics, Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Currently, we provide metrics for the input and output queue lengths, records > and bytes. For diagnosing network issues, it would also be nice to know the > number of buffers because we would then know their fill rate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers
NicoK commented on a change in pull request #6551: [FLINK-10022][network][metrics] add metrics for input/output buffers URL: https://github.com/apache/flink/pull/6551#discussion_r209923936 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java ## @@ -104,7 +130,6 @@ public Meter getNumBytesInRemoteRateMeter() { public Meter getNumBytesOutRateMeter() { return numBytesOutRate; } - Review comment: don't know either...reverting that... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services