[GitHub] satybald commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method

2018-09-21 Thread GitBox
satybald commented on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-423427348
 
 
   @StephanEwen agree, try expression doesn't incur any performance penalties. 
Another option that doesn't affect performance is to document that the key must 
be not null in keyBy() method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219407109
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
+  temporalJoin: Join,
+  left: RelNode,
+  right: RelNode)
+: Set[Int] = {
+
+// Materialize all of the time attributes from the right side of temporal 
join
+var indicesToMaterialize =
+  (left.getRowType.getFieldCount until 
temporalJoin.getRowType.getFieldCount).toSet
+
+if (!hasRowtimeAttribute(right.getRowType)) {
 
 Review comment:
   Did you also add a test for this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219407507
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 ##
 @@ -191,12 +174,10 @@ case class RowtimeAttribute(expr: Expression) extends 
TimeAttribute(expr) {
 ValidationSuccess
   case WindowReference(_, _) =>
 ValidationFailure("Reference to a rowtime or proctime window 
required.")
-  case TemporalTableReference(_, _) =>
-ValidationSuccess
   case any =>
 ValidationFailure(
-  s"The '.rowtime' expression can only be used for table definitions, 
windows, " +
-s" and temporal tables, while [${any}] was found.")
+  s"The '.rowtime' expression can only be used for table definitions, 
windows " +
+s"and temporal table definitions, while [$any] was found.")
 
 Review comment:
   I think table definitions are not supported anymore. So we can reset this 
message entirely, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623207#comment-16623207
 ] 

ASF GitHub Bot commented on FLINK-9567:
---

asfgit closed pull request #6669: [FLINK-9567][runtime][yarn] Fix the yarn 
container over allocation in…
URL: https://github.com/apache/flink/pull/6669
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index cf3588f6593..956e40fe61b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -334,7 +334,7 @@ public void onContainersCompleted(final 
List statuses) {
if (yarnWorkerNode != null) {
// Container completed 
unexpectedly ~> start a new one
final Container container = 
yarnWorkerNode.getContainer();
-   
internalRequestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
+   
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
}
// Eagerly close the connection with 
task manager.
closeTaskManagerConnection(resourceId, 
new Exception(containerStatus.getDiagnostics()));
@@ -443,17 +443,24 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
return new Tuple2<>(host, Integer.valueOf(port));
}
 
+   /**
+* Request new container if pending containers cannot satisfies pending 
slot requests.
+*/
private void requestYarnContainer(Resource resource, Priority priority) 
{
-   resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
+   int pendingSlotRequests = getNumberPendingSlotRequests();
+   int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
+   if (pendingSlotRequests > pendingSlotAllocation) {
+   resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
 
-   // make sure we transmit the request fast and receive fast news 
of granted allocations
-   
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+   // make sure we transmit the request fast and receive 
fast news of granted allocations
+   
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-   numPendingContainerRequests++;
+   numPendingContainerRequests++;
 
-   log.info("Requesting new TaskExecutor container with resources 
{}. Number pending requests {}.",
-   resource,
-   numPendingContainerRequests);
+   log.info("Requesting new TaskExecutor container with 
resources {}. Number pending requests {}.",
+   resource,
+   numPendingContainerRequests);
+   }
}
 
private ContainerLaunchContext createTaskExecutorLaunchContext(Resource 
resource, String containerId, String host)
@@ -510,15 +517,4 @@ private int generatePriority(ResourceProfile 
resourceProfile) {
return priority;
}
}
-
-   /**
-* Request new container if pending containers cannot satisfies pending 
slot requests.
-*/
-   private void internalRequestYarnContainer(Resource resource, Priority 
priority) {
-   int pendingSlotRequests = getNumberPendingSlotRequests();
-   int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
-   if (pendingSlotRequests > pendingSlotAllocation) {
-   requestYarnContainer(resource, priority);
-   }
-   }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink does not release resource in Yarn Cluster mode
> 

[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623198#comment-16623198
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219407109
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
+  temporalJoin: Join,
+  left: RelNode,
+  right: RelNode)
+: Set[Int] = {
+
+// Materialize all of the time attributes from the right side of temporal 
join
+var indicesToMaterialize =
+  (left.getRowType.getFieldCount until 
temporalJoin.getRowType.getFieldCount).toSet
+
+if (!hasRowtimeAttribute(right.getRowType)) {
 
 Review comment:
   Did you also add a test for this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6669: [FLINK-9567][runtime][yarn] Fix the yarn container over allocation in…

2018-09-21 Thread GitBox
asfgit closed pull request #6669: [FLINK-9567][runtime][yarn] Fix the yarn 
container over allocation in…
URL: https://github.com/apache/flink/pull/6669
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index cf3588f6593..956e40fe61b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -334,7 +334,7 @@ public void onContainersCompleted(final 
List statuses) {
if (yarnWorkerNode != null) {
// Container completed 
unexpectedly ~> start a new one
final Container container = 
yarnWorkerNode.getContainer();
-   
internalRequestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
+   
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
}
// Eagerly close the connection with 
task manager.
closeTaskManagerConnection(resourceId, 
new Exception(containerStatus.getDiagnostics()));
@@ -443,17 +443,24 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
return new Tuple2<>(host, Integer.valueOf(port));
}
 
+   /**
+* Request new container if pending containers cannot satisfies pending 
slot requests.
+*/
private void requestYarnContainer(Resource resource, Priority priority) 
{
-   resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
+   int pendingSlotRequests = getNumberPendingSlotRequests();
+   int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
+   if (pendingSlotRequests > pendingSlotAllocation) {
+   resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
 
-   // make sure we transmit the request fast and receive fast news 
of granted allocations
-   
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+   // make sure we transmit the request fast and receive 
fast news of granted allocations
+   
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-   numPendingContainerRequests++;
+   numPendingContainerRequests++;
 
-   log.info("Requesting new TaskExecutor container with resources 
{}. Number pending requests {}.",
-   resource,
-   numPendingContainerRequests);
+   log.info("Requesting new TaskExecutor container with 
resources {}. Number pending requests {}.",
+   resource,
+   numPendingContainerRequests);
+   }
}
 
private ContainerLaunchContext createTaskExecutorLaunchContext(Resource 
resource, String containerId, String host)
@@ -510,15 +517,4 @@ private int generatePriority(ResourceProfile 
resourceProfile) {
return priority;
}
}
-
-   /**
-* Request new container if pending containers cannot satisfies pending 
slot requests.
-*/
-   private void internalRequestYarnContainer(Resource resource, Priority 
priority) {
-   int pendingSlotRequests = getNumberPendingSlotRequests();
-   int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
-   if (pendingSlotRequests > pendingSlotAllocation) {
-   requestYarnContainer(resource, priority);
-   }
-   }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623213#comment-16623213
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219410591
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
 
 Review comment:
   fixed, but imo in visitor pattern is better to have follow this pattern:
   ```
   public visit(X);
   
   private doSthWith(X);
   
   private doSthElseWith(X);
   
   public visit(Y)
   
   private doSthWith(Y);
   
   private doSthElseWith(Y);
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread GitBox
tillrohrmann commented on issue #6727: [FLINK-10371] Allow to enable SSL mutual 
authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#issuecomment-423449469
 
 
   Thanks for your contribution @kleingeist. The documentation needs to be 
updated in order to pass the tests. Please see `flink-docs/README.md` for 
further details.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623236#comment-16623236
 ] 

ASF GitHub Bot commented on FLINK-10371:


tillrohrmann commented on issue #6727: [FLINK-10371] Allow to enable SSL mutual 
authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#issuecomment-423449469
 
 
   Thanks for your contribution @kleingeist. The documentation needs to be 
updated in order to pass the tests. Please see `flink-docs/README.md` for 
further details.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> 
>
> Key: FLINK-10371
> URL: https://issues.apache.org/jira/browse/FLINK-10371
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, REST, Security
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Johannes Dillmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal 
> connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to 
> encryption of the connection and verification of the Flink REST endpoint from 
> the client side.
> But _mutual authentication between the REST endpoint and clients is not 
> supported yet_.
>  The [documentation suggests 
> |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html]
>  using a side car proxy to enable SSL mutual auth on the REST endpoint and 
> points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for 
> support of  simple mutual authentication directly in Flink: Mainly support 
> for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) 
> and act as gateways to the Flink REST endpoint and the Flink web interface. 
> To prevent unauthorised access to Flink the connectivity has to be secured. 
> As the tools acts as gateway it is easy to create and pass a shared keystore  
> and truststore used for mutual authentication to the Flink instances 
> configurations.
> To enable for SSL mutual authentication on REST endpoints, I am suggesting to 
> add a the configuration parameter `security.ssl.rest.authentication-enabled` 
> which defaults to `false`.
>  If it is set to `true` the `SSLUtils` factories for creating the REST server 
> endpoint and the REST clients should set authentication to required and share 
> `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL 
> mutual authenticated connections.
>  
> I have a working prototype which I would gladly submit as a PR to get further 
> feedback. The changes to Flink are minimal and the default behaviour won't 
> change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623107#comment-16623107
 ] 

ASF GitHub Bot commented on FLINK-10380:


satybald removed a comment on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-423422608
 
 
   another option would be to add try/catch statement for NPE. Try statement 
doesn't incur any performance issue unless an exception is thrown.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Check if key is not nul before assign to group in KeyedStream
> -
>
> Key: FLINK-10380
> URL: https://issues.apache.org/jira/browse/FLINK-10380
> Project: Flink
>  Issue Type: Task
>Affects Versions: 1.6.0
>Reporter: Sayat Satybaldiyev
>Priority: Minor
>  Labels: pull-request-available
>
> If a user creates a KeyedStream and partition by key which might be null, 
> Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink 
> throws hard to debug and understand as it doesn't refer to place in Flink job.
> *Suggestion:*
> Add precondition that checks if the key is not null and throw a descriptive 
> error if it's a null.
>  
> *Job Example*:
>  
> {code:java}
> DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb"))
>  .map(x -> (String)null)
>  .keyBy(x -> x);{code}
>  
>  
> An error that is thrown:
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.RuntimeException
>  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>  at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
>  INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC 
> service.
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> {code}
> ... 10 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10385) Implement a waitUntilCondition utils

2018-09-21 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-10385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623196#comment-16623196
 ] 

陈梓立 commented on FLINK-10385:
-

Maybe a waiting utils based on {{sleep}} and test would lose state changes 
quite quickly. Is it reasonable to use {{CountDownLatch}} or {{OneShotLatch}} 
at this case? If so, then there is a guideline and we can refactor tests 
gradually.

> Implement a waitUntilCondition utils
> 
>
> Key: FLINK-10385
> URL: https://issues.apache.org/jira/browse/FLINK-10385
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Priority: Major
> Fix For: 1.7.0
>
> Attachments: example.java
>
>
> Recently when I refine some tests, I notice that it is a common requirement 
> to wait until a (stable) condition occur.
> To achieve this, we have {{ExecutionGraphTestUtils#waitUntilJobStatus}} and 
> many. Most of them can simply abstract as
> {code:java}
> public static void waitUntilCondition(SupplierWithException Throwable> conditionSupplier, Deadline deadline) {
>   while (deadline.hasTimeLeft()) {
> if (conditionSupplier.get()) { return; }
> Thread.sleep(Math.min(deadline.toMillis(), 500);
>   }
>   throws new IlleagalStateException("...");
> }
> {code}
>  
> I propose to implement such a method to avoid too many utils method scattered 
> to achieve the same purpose.
>  Looking forward to your advice. If there is previous code/project already 
> implemented this, I am glad to introduce it.
> cc [~Zentol]
>  
> PS: the file attached is some code I found could satisfy this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10260) Confusing log during TaskManager registration

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623206#comment-16623206
 ] 

ASF GitHub Bot commented on FLINK-10260:


asfgit closed pull request #6720: [FLINK-10260] Change log level to debug in 
ResourceManager in case of TaskExecutor reconnect
URL: https://github.com/apache/flink/pull/6720
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 242fbaa086a..15a3757ab35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -118,7 +118,7 @@ Executor getExecutor() {
public void monitorTarget(ResourceID resourceID, HeartbeatTarget 
heartbeatTarget) {
if (!stopped) {
if (heartbeatTargets.containsKey(resourceID)) {
-   log.info("The target with resource ID {} is 
already been monitored.", resourceID);
+   log.debug("The target with resource ID {} is 
already been monitored.", resourceID);
} else {
HeartbeatManagerImpl.HeartbeatMonitor 
heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
resourceID,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 39844839453..ac1181b1d1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -694,7 +694,7 @@ private RegistrationResponse registerTaskExecutorInternal(
WorkerRegistration oldRegistration = 
taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
// TODO :: suggest old taskExecutor to stop itself
-   log.info("Replacing old registration of TaskExecutor 
{}.", taskExecutorResourceId);
+   log.debug("Replacing old registration of TaskExecutor 
{}.", taskExecutorResourceId);
 
// remove old task manager registration from slot 
manager

slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d54d1434bf1..bab56609a1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -324,7 +324,7 @@ public boolean unregisterSlotRequest(AllocationID 
allocationId) {
public void registerTaskManager(final TaskExecutorConnection 
taskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
 
-   LOG.info("Registering TaskManager {} under {} at the 
SlotManager.", taskExecutorConnection.getResourceID(), 
taskExecutorConnection.getInstanceID());
+   LOG.debug("Registering TaskManager {} under {} at the 
SlotManager.", taskExecutorConnection.getResourceID(), 
taskExecutorConnection.getInstanceID());
 
// we identify task managers by their instance id
if 
(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
@@ -366,7 +366,7 @@ public void registerTaskManager(final 
TaskExecutorConnection taskExecutorConnect
public boolean unregisterTaskManager(InstanceID instanceId) {
checkInit();
 
-   LOG.info("Unregister TaskManager {} from the SlotManager.", 
instanceId);
+   LOG.debug("Unregister TaskManager {} from the SlotManager.", 
instanceId);
 
TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.remove(instanceId);
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Confusing log during 

[GitHub] asfgit closed pull request #6720: [FLINK-10260] Change log level to debug in ResourceManager in case of TaskExecutor reconnect

2018-09-21 Thread GitBox
asfgit closed pull request #6720: [FLINK-10260] Change log level to debug in 
ResourceManager in case of TaskExecutor reconnect
URL: https://github.com/apache/flink/pull/6720
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 242fbaa086a..15a3757ab35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -118,7 +118,7 @@ Executor getExecutor() {
public void monitorTarget(ResourceID resourceID, HeartbeatTarget 
heartbeatTarget) {
if (!stopped) {
if (heartbeatTargets.containsKey(resourceID)) {
-   log.info("The target with resource ID {} is 
already been monitored.", resourceID);
+   log.debug("The target with resource ID {} is 
already been monitored.", resourceID);
} else {
HeartbeatManagerImpl.HeartbeatMonitor 
heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
resourceID,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 39844839453..ac1181b1d1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -694,7 +694,7 @@ private RegistrationResponse registerTaskExecutorInternal(
WorkerRegistration oldRegistration = 
taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
// TODO :: suggest old taskExecutor to stop itself
-   log.info("Replacing old registration of TaskExecutor 
{}.", taskExecutorResourceId);
+   log.debug("Replacing old registration of TaskExecutor 
{}.", taskExecutorResourceId);
 
// remove old task manager registration from slot 
manager

slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d54d1434bf1..bab56609a1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -324,7 +324,7 @@ public boolean unregisterSlotRequest(AllocationID 
allocationId) {
public void registerTaskManager(final TaskExecutorConnection 
taskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
 
-   LOG.info("Registering TaskManager {} under {} at the 
SlotManager.", taskExecutorConnection.getResourceID(), 
taskExecutorConnection.getInstanceID());
+   LOG.debug("Registering TaskManager {} under {} at the 
SlotManager.", taskExecutorConnection.getResourceID(), 
taskExecutorConnection.getInstanceID());
 
// we identify task managers by their instance id
if 
(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
@@ -366,7 +366,7 @@ public void registerTaskManager(final 
TaskExecutorConnection taskExecutorConnect
public boolean unregisterTaskManager(InstanceID instanceId) {
checkInit();
 
-   LOG.info("Unregister TaskManager {} from the SlotManager.", 
instanceId);
+   LOG.debug("Unregister TaskManager {} from the SlotManager.", 
instanceId);
 
TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.remove(instanceId);
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10375) ExceptionInChainedStubException hides wrapped exception in cause

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623205#comment-16623205
 ] 

ASF GitHub Bot commented on FLINK-10375:


asfgit closed pull request #6719: [FLINK-10375] Add wrapped exception as cause 
in ExceptionInChainedStubException
URL: https://github.com/apache/flink/pull/6719
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index 11b5bf033ee..e7cebc50120 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -33,7 +33,7 @@

 
public ExceptionInChainedStubException(String taskName, Exception 
wrappedException) {
-   super();
+   super("Exception in chained task '" + taskName + "'", 
exceptionUnwrap(wrappedException));
this.taskName = taskName;
this.exception = wrappedException;
}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ExceptionInChainedStubException hides wrapped exception in cause
> 
>
> Key: FLINK-10375
> URL: https://issues.apache.org/jira/browse/FLINK-10375
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Mike Pedersen
>Assignee: Mike Pedersen
>Priority: Minor
>  Labels: pull-request-available
>
> ExceptionInChainedStubException does not have the wrapped exception as the 
> cause. This creates generally unhelpful exception traces like this:
> {code:java}
> org.apache.beam.sdk.util.UserCodeException: 
> org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
>   at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>   at 
> org.apache.beam.sdk.io.WriteFiles$ApplyShardingKeyFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
>   at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:66)
>   at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)
>   at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:149)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
>   at 
> org.apache.beam.sdk.io.WriteFiles$ApplyShardingKeyFn.processElement(WriteFiles.java:686)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-21 Thread GitBox
TisonKun commented on issue #6729: [FLINK-10386] [taskmanager] Remove legacy 
class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#issuecomment-423436864
 
 
   Travis fails on `TaskTest` because it is based on legacy code base. 
Temporarily hack to refine it.
   Note that although it is a hack, the coverage of test is kept -- failure 
messages are checked by previous assertion.
   
   `TaskTest` based on FLIP-6 would be a follow-up pull request and make this 
one as clean as possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623190#comment-16623190
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219405006
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
 
 Review comment:
   nit: put private methods below public methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10263) User-defined function with LITERAL paramters yields CompileException

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623191#comment-16623191
 ] 

ASF GitHub Bot commented on FLINK-10263:


dawidwys commented on a change in pull request #6725: [FLINK-10263] 
[sql-client] Fix classloader issues in SQL Client
URL: https://github.com/apache/flink/pull/6725#discussion_r219405018
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -183,6 +184,21 @@ public EnvironmentInstance createEnvironmentInstance() {
return tableSinks;
}
 
+   /**
+* Executes the given supplier using the execution context's 
classloader as thread classloader.
+*/
+   public  R wrapClassLoader(Supplier supplier) {
+   final ClassLoader previousClassloader = 
Thread.currentThread().getContextClassLoader();
+   Thread.currentThread().setContextClassLoader(classLoader);
+   R returnValue;
+   try {
+   returnValue = supplier.get();
 
 Review comment:
   Can't we just return it here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined function with LITERAL paramters yields CompileException
> 
>
> Key: FLINK-10263
> URL: https://issues.apache.org/jira/browse/FLINK-10263
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> When using a user-defined scalar function only with literal parameters, a 
> {{CompileException}} is thrown. For example
> {code}
> SELECT myFunc(CAST(40.750444 AS FLOAT), CAST(-73.993475 AS FLOAT))
> public class MyFunc extends ScalarFunction {
>   public int eval(float lon, float lat) {
>   // do something
>   }
> }
> {code}
> results in 
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 5, Column 10: Cannot 
> determine simple type name "com"
> {code}
> The problem is probably caused by the expression reducer because it 
> disappears if a regular attribute is added to a parameter expression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10385) Implement a waitUntilCondition utils

2018-09-21 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-10385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623196#comment-16623196
 ] 

陈梓立 edited comment on FLINK-10385 at 9/21/18 7:36 AM:
--

Maybe a waiting utils based on {{sleep}} and test would lose state changes 
quite quickly. Is it reasonable to use {{CountDownLatch}} or {{OneShotLatch}} 
at this case? If so, then there is a guideline and we can refactor tests 
gradually.

But well, some cases might not easy to inject a latch.


was (Author: tison):
Maybe a waiting utils based on {{sleep}} and test would lose state changes 
quite quickly. Is it reasonable to use {{CountDownLatch}} or {{OneShotLatch}} 
at this case? If so, then there is a guideline and we can refactor tests 
gradually.

> Implement a waitUntilCondition utils
> 
>
> Key: FLINK-10385
> URL: https://issues.apache.org/jira/browse/FLINK-10385
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Priority: Major
> Fix For: 1.7.0
>
> Attachments: example.java
>
>
> Recently when I refine some tests, I notice that it is a common requirement 
> to wait until a (stable) condition occur.
> To achieve this, we have {{ExecutionGraphTestUtils#waitUntilJobStatus}} and 
> many. Most of them can simply abstract as
> {code:java}
> public static void waitUntilCondition(SupplierWithException Throwable> conditionSupplier, Deadline deadline) {
>   while (deadline.hasTimeLeft()) {
> if (conditionSupplier.get()) { return; }
> Thread.sleep(Math.min(deadline.toMillis(), 500);
>   }
>   throws new IlleagalStateException("...");
> }
> {code}
>  
> I propose to implement such a method to avoid too many utils method scattered 
> to achieve the same purpose.
>  Looking forward to your advice. If there is previous code/project already 
> implemented this, I am glad to introduce it.
> cc [~Zentol]
>  
> PS: the file attached is some code I found could satisfy this proposal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623199#comment-16623199
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219407507
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 ##
 @@ -191,12 +174,10 @@ case class RowtimeAttribute(expr: Expression) extends 
TimeAttribute(expr) {
 ValidationSuccess
   case WindowReference(_, _) =>
 ValidationFailure("Reference to a rowtime or proctime window 
required.")
-  case TemporalTableReference(_, _) =>
-ValidationSuccess
   case any =>
 ValidationFailure(
-  s"The '.rowtime' expression can only be used for table definitions, 
windows, " +
-s" and temporal tables, while [${any}] was found.")
+  s"The '.rowtime' expression can only be used for table definitions, 
windows " +
+s"and temporal table definitions, while [$any] was found.")
 
 Review comment:
   I think table definitions are not supported anymore. So we can reset this 
message entirely, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219411672
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 ##
 @@ -191,12 +174,10 @@ case class RowtimeAttribute(expr: Expression) extends 
TimeAttribute(expr) {
 ValidationSuccess
   case WindowReference(_, _) =>
 ValidationFailure("Reference to a rowtime or proctime window 
required.")
-  case TemporalTableReference(_, _) =>
-ValidationSuccess
   case any =>
 ValidationFailure(
-  s"The '.rowtime' expression can only be used for table definitions, 
windows, " +
-s" and temporal tables, while [${any}] was found.")
+  s"The '.rowtime' expression can only be used for table definitions, 
windows " +
+s"and temporal table definitions, while [$any] was found.")
 
 Review comment:
   Yes, you are right :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623216#comment-16623216
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219411313
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
+  temporalJoin: Join,
+  left: RelNode,
+  right: RelNode)
+: Set[Int] = {
+
+// Materialize all of the time attributes from the right side of temporal 
join
+var indicesToMaterialize =
+  (left.getRowType.getFieldCount until 
temporalJoin.getRowType.getFieldCount).toSet
+
+if (!hasRowtimeAttribute(right.getRowType)) {
 
 Review comment:
   No, I forgot. Adding one now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623217#comment-16623217
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219411672
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 ##
 @@ -191,12 +174,10 @@ case class RowtimeAttribute(expr: Expression) extends 
TimeAttribute(expr) {
 ValidationSuccess
   case WindowReference(_, _) =>
 ValidationFailure("Reference to a rowtime or proctime window 
required.")
-  case TemporalTableReference(_, _) =>
-ValidationSuccess
   case any =>
 ValidationFailure(
-  s"The '.rowtime' expression can only be used for table definitions, 
windows, " +
-s" and temporal tables, while [${any}] was found.")
+  s"The '.rowtime' expression can only be used for table definitions, 
windows " +
+s"and temporal table definitions, while [$any] was found.")
 
 Review comment:
   Yes, you are right :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623106#comment-16623106
 ] 

ASF GitHub Bot commented on FLINK-10380:


satybald commented on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-423422608
 
 
   another option would be to add try/catch statement for NPE. Try statement 
doesn't incur any performance issue unless an exception is thrown.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Check if key is not nul before assign to group in KeyedStream
> -
>
> Key: FLINK-10380
> URL: https://issues.apache.org/jira/browse/FLINK-10380
> Project: Flink
>  Issue Type: Task
>Affects Versions: 1.6.0
>Reporter: Sayat Satybaldiyev
>Priority: Minor
>  Labels: pull-request-available
>
> If a user creates a KeyedStream and partition by key which might be null, 
> Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink 
> throws hard to debug and understand as it doesn't refer to place in Flink job.
> *Suggestion:*
> Add precondition that checks if the key is not null and throw a descriptive 
> error if it's a null.
>  
> *Job Example*:
>  
> {code:java}
> DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb"))
>  .map(x -> (String)null)
>  .keyBy(x -> x);{code}
>  
>  
> An error that is thrown:
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.RuntimeException
>  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>  at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
>  INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC 
> service.
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> {code}
> ... 10 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] satybald commented on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method

2018-09-21 Thread GitBox
satybald commented on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-423422608
 
 
   another option would be to add try/catch statement for NPE. Try statement 
doesn't incur any performance issue unless an exception is thrown.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] satybald removed a comment on issue #6724: [FLINK-10380] Add precondition for assignToKeyGroup method

2018-09-21 Thread GitBox
satybald removed a comment on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-423422608
 
 
   another option would be to add try/catch statement for NPE. Try statement 
doesn't incur any performance issue unless an exception is thrown.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623132#comment-16623132
 ] 

ASF GitHub Bot commented on FLINK-10380:


satybald commented on issue #6724: [FLINK-10380] Add precondition for 
assignToKeyGroup method
URL: https://github.com/apache/flink/pull/6724#issuecomment-423427348
 
 
   @StephanEwen agree, try expression doesn't incur any performance penalties. 
Another option that doesn't affect performance is to document that the key must 
be not null in keyBy() method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Check if key is not nul before assign to group in KeyedStream
> -
>
> Key: FLINK-10380
> URL: https://issues.apache.org/jira/browse/FLINK-10380
> Project: Flink
>  Issue Type: Task
>Affects Versions: 1.6.0
>Reporter: Sayat Satybaldiyev
>Priority: Minor
>  Labels: pull-request-available
>
> If a user creates a KeyedStream and partition by key which might be null, 
> Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink 
> throws hard to debug and understand as it doesn't refer to place in Flink job.
> *Suggestion:*
> Add precondition that checks if the key is not null and throw a descriptive 
> error if it's a null.
>  
> *Job Example*:
>  
> {code:java}
> DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb"))
>  .map(x -> (String)null)
>  .keyBy(x -> x);{code}
>  
>  
> An error that is thrown:
>  
> {code:java}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.RuntimeException
>  at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>  at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
> Caused by: java.lang.RuntimeException
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
>  INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC 
> service.
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
>  at 
> org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
>  at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
>  at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>  at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> {code}
> ... 10 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623203#comment-16623203
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

twalthr commented on issue #6299: [FLINK-9713][table][sql] Support processing 
time versioned joins
URL: https://github.com/apache/flink/pull/6299#issuecomment-423443446
 
 
   Thank you @pnowojski. I added my last 4 comments. Feel free to merge this PR 
after you addressed them. +1 from my side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
twalthr commented on issue #6299: [FLINK-9713][table][sql] Support processing 
time versioned joins
URL: https://github.com/apache/flink/pull/6299#issuecomment-423443446
 
 
   Thank you @pnowojski. I added my last 4 comments. Feel free to merge this PR 
after you addressed them. +1 from my side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219410591
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
 
 Review comment:
   fixed, but imo in visitor pattern is better to have follow this pattern:
   ```
   public visit(X);
   
   private doSthWith(X);
   
   private doSthElseWith(X);
   
   public visit(Y)
   
   private doSthWith(Y);
   
   private doSthElseWith(Y);
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10384:
---
Labels: pull-request-available  (was: )

> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623156#comment-16623156
 ] 

ASF GitHub Bot commented on FLINK-10384:


yanghua opened a new pull request #6730: [FLINK-10384] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730
 
 
   ## What is the purpose of the change
   
   *This pull request adds Sinh math function supported in Table API and SQL*
   
   
   ## Brief change log
   
 - *Adds Sinh math function supported in Table API and SQL*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*ScalarFunctionsTest#testSinh*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #6730: [FLINK-10384] Add Sinh math function supported in Table API and SQL

2018-09-21 Thread GitBox
yanghua opened a new pull request #6730: [FLINK-10384] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730
 
 
   ## What is the purpose of the change
   
   *This pull request adds Sinh math function supported in Table API and SQL*
   
   
   ## Brief change log
   
 - *Adds Sinh math function supported in Table API and SQL*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*ScalarFunctionsTest#testSinh*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623173#comment-16623173
 ] 

ASF GitHub Bot commented on FLINK-10386:


TisonKun edited a comment on issue #6729: [FLINK-10386] [taskmanager] Remove 
legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#issuecomment-423436864
 
 
   Travis fails on `TaskTest` because it is based on legacy code base. 
Temporarily hack to refine it.
   Note that although it is a hack, the coverage of test is kept -- terminate 
messages are checked by previous assertion.
   
   `TaskTest` based on FLIP-6 would be a follow-up pull request and make this 
one as clean as possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Clarkkkkk closed pull request #6676: [FLINK-10247][Metrics] Run MetricQueryService in separate thread pool

2018-09-21 Thread GitBox
Clark closed pull request #6676: [FLINK-10247][Metrics] Run 
MetricQueryService in separate thread pool
URL: https://github.com/apache/flink/pull/6676
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 8821e0d9f4a..c283893c249 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -40,6 +40,8 @@
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer;
 
@@ -69,54 +71,19 @@ public String filterCharacters(String input) {
private final Map> counters = 
new HashMap<>();
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
+   private final ExecutorService threadpool = 
Executors.newSingleThreadExecutor();
 
@Override
public void postStop() {
serializer.close();
+   if (threadpool != null && !threadpool.isShutdown()) {
+   threadpool.shutdownNow();
+   }
}
 
@Override
public void onReceive(Object message) {
-   try {
-   if (message instanceof AddMetric) {
-   AddMetric added = (AddMetric) message;
-
-   String metricName = added.metricName;
-   Metric metric = added.metric;
-   AbstractMetricGroup group = added.group;
-
-   QueryScopeInfo info = 
group.getQueryServiceMetricInfo(FILTER);
-
-   if (metric instanceof Counter) {
-   counters.put((Counter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Gauge) {
-   gauges.put((Gauge) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Histogram) {
-   histograms.put((Histogram) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Meter) {
-   meters.put((Meter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   }
-   } else if (message instanceof RemoveMetric) {
-   Metric metric = (((RemoveMetric) 
message).metric);
-   if (metric instanceof Counter) {
-   this.counters.remove(metric);
-   } else if (metric instanceof Gauge) {
-   this.gauges.remove(metric);
-   } else if (metric instanceof Histogram) {
-   this.histograms.remove(metric);
-   } else if (metric instanceof Meter) {
-   this.meters.remove(metric);
-   }
-   } else if (message instanceof CreateDump) {
-   
MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
-   } else {
-   LOG.warn("MetricQueryServiceActor received an 
invalid message. " + message.toString());
-   getSender().tell(new Status.Failure(new 
IOException("MetricQueryServiceActor received an invalid message. " + 
message.toString())), getSelf());
-   }
-   } catch (Exception e) {
-   LOG.warn("An exception occurred while processing a 
message.", e);
-   }
+   threadpool.submit(new MetricMessageHandlerRunnable(message, 
gauges, counters, histograms, meters, getSender(), getSelf(), serializer));
}
 
/**
@@ -221,4 +188,75 @@ public static Object getCreateDump() {
private static class CreateDump implements Serializable {
private static final CreateDump INSTANCE = new CreateDump();
}
+

[jira] [Commented] (FLINK-10247) Run MetricQueryService in separate thread pool

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623195#comment-16623195
 ] 

ASF GitHub Bot commented on FLINK-10247:


Clark closed pull request #6676: [FLINK-10247][Metrics] Run 
MetricQueryService in separate thread pool
URL: https://github.com/apache/flink/pull/6676
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
index 8821e0d9f4a..c283893c249 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -40,6 +40,8 @@
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer;
 
@@ -69,54 +71,19 @@ public String filterCharacters(String input) {
private final Map> counters = 
new HashMap<>();
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
+   private final ExecutorService threadpool = 
Executors.newSingleThreadExecutor();
 
@Override
public void postStop() {
serializer.close();
+   if (threadpool != null && !threadpool.isShutdown()) {
+   threadpool.shutdownNow();
+   }
}
 
@Override
public void onReceive(Object message) {
-   try {
-   if (message instanceof AddMetric) {
-   AddMetric added = (AddMetric) message;
-
-   String metricName = added.metricName;
-   Metric metric = added.metric;
-   AbstractMetricGroup group = added.group;
-
-   QueryScopeInfo info = 
group.getQueryServiceMetricInfo(FILTER);
-
-   if (metric instanceof Counter) {
-   counters.put((Counter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Gauge) {
-   gauges.put((Gauge) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Histogram) {
-   histograms.put((Histogram) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   } else if (metric instanceof Meter) {
-   meters.put((Meter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
-   }
-   } else if (message instanceof RemoveMetric) {
-   Metric metric = (((RemoveMetric) 
message).metric);
-   if (metric instanceof Counter) {
-   this.counters.remove(metric);
-   } else if (metric instanceof Gauge) {
-   this.gauges.remove(metric);
-   } else if (metric instanceof Histogram) {
-   this.histograms.remove(metric);
-   } else if (metric instanceof Meter) {
-   this.meters.remove(metric);
-   }
-   } else if (message instanceof CreateDump) {
-   
MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
-   } else {
-   LOG.warn("MetricQueryServiceActor received an 
invalid message. " + message.toString());
-   getSender().tell(new Status.Failure(new 
IOException("MetricQueryServiceActor received an invalid message. " + 
message.toString())), getSelf());
-   }
-   } catch (Exception e) {
-   LOG.warn("An exception occurred while processing a 
message.", e);
-   }
+   threadpool.submit(new MetricMessageHandlerRunnable(message, 
gauges, counters, histograms, meters, getSender(), getSelf(), 

[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6

2018-09-21 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623209#comment-16623209
 ] 

Till Rohrmann commented on FLINK-9235:
--

What's the status of this issue [~suez1224] and [~aljoscha]?

> Add Integration test for Flink-Yarn-Kerberos integration for flip-6
> ---
>
> Key: FLINK-9235
> URL: https://issues.apache.org/jira/browse/FLINK-9235
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.5.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We need to provide an integration test for flip-6 similar to 
> YARNSessionFIFOSecuredITCase for the legacy deployment mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski commented on issue #6299: [FLINK-9713][table][sql] Support processing 
time versioned joins
URL: https://github.com/apache/flink/pull/6299#issuecomment-423448194
 
 
   Thanks for the review @twalthr :) I'm pushing latest update. Will merge it 
once it's green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623219#comment-16623219
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski edited a comment on issue #6299: [FLINK-9713][table][sql] Support 
processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#issuecomment-423448194
 
 
   Thanks for the review @twalthr :) I'm pushing latest update (already 
squashed). Will merge it once it's green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski edited a comment on issue #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski edited a comment on issue #6299: [FLINK-9713][table][sql] Support 
processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#issuecomment-423448194
 
 
   Thanks for the review @twalthr :) I'm pushing latest update (already 
squashed). Will merge it once it's green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623218#comment-16623218
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski commented on issue #6299: [FLINK-9713][table][sql] Support processing 
time versioned joins
URL: https://github.com/apache/flink/pull/6299#issuecomment-423448194
 
 
   Thanks for the review @twalthr :) I'm pushing latest update. Will merge it 
once it's green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-21 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623153#comment-16623153
 ] 

zhijiang commented on FLINK-10339:
--

Hey [~NicoK]  [~pnowojski], do you think we should make {{SpillReadBufferPool}} 
use direct memory as current {{NetworkBufferPool}} or are there any concerns to 
change this?

 

> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10386) Remove legacy class TaskExecutionStateListener

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623171#comment-16623171
 ] 

ASF GitHub Bot commented on FLINK-10386:


TisonKun commented on issue #6729: [FLINK-10386] [taskmanager] Remove legacy 
class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#issuecomment-423436864
 
 
   Travis fails on `TaskTest` because it is based on legacy code base. 
Temporarily hack to refine it.
   Note that although it is a hack, the coverage of test is kept -- failure 
messages are checked by previous assertion.
   
   `TaskTest` based on FLIP-6 would be a follow-up pull request and make this 
one as clean as possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> --
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Affects Versions: 1.7.0
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun edited a comment on issue #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener

2018-09-21 Thread GitBox
TisonKun edited a comment on issue #6729: [FLINK-10386] [taskmanager] Remove 
legacy class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729#issuecomment-423436864
 
 
   Travis fails on `TaskTest` because it is based on legacy code base. 
Temporarily hack to refine it.
   Note that although it is a hack, the coverage of test is kept -- terminate 
messages are checked by previous assertion.
   
   `TaskTest` based on FLIP-6 would be a follow-up pull request and make this 
one as clean as possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
twalthr commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219405006
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
 
 Review comment:
   nit: put private methods below public methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client

2018-09-21 Thread GitBox
dawidwys commented on a change in pull request #6725: [FLINK-10263] 
[sql-client] Fix classloader issues in SQL Client
URL: https://github.com/apache/flink/pull/6725#discussion_r219405018
 
 

 ##
 File path: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -183,6 +184,21 @@ public EnvironmentInstance createEnvironmentInstance() {
return tableSinks;
}
 
+   /**
+* Executes the given supplier using the execution context's 
classloader as thread classloader.
+*/
+   public  R wrapClassLoader(Supplier supplier) {
+   final ClassLoader previousClassloader = 
Thread.currentThread().getContextClassLoader();
+   Thread.currentThread().setContextClassLoader(classLoader);
+   R returnValue;
+   try {
+   returnValue = supplier.get();
 
 Review comment:
   Can't we just return it here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #6719: [FLINK-10375] Add wrapped exception as cause in ExceptionInChainedStubException

2018-09-21 Thread GitBox
asfgit closed pull request #6719: [FLINK-10375] Add wrapped exception as cause 
in ExceptionInChainedStubException
URL: https://github.com/apache/flink/pull/6719
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index 11b5bf033ee..e7cebc50120 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -33,7 +33,7 @@

 
public ExceptionInChainedStubException(String taskName, Exception 
wrappedException) {
-   super();
+   super("Exception in chained task '" + taskName + "'", 
exceptionUnwrap(wrappedException));
this.taskName = taskName;
this.exception = wrappedException;
}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-7942) NPE when apply FilterJoinRule

2018-09-21 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reopened FLINK-7942:
--

> NPE when apply FilterJoinRule
> -
>
> Key: FLINK-7942
> URL: https://issues.apache.org/jira/browse/FLINK-7942
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: lincoln.lee
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.4.0, 1.5.0
>
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule 
> FilterJoinRule:FilterJoinRule:filter, args 
> [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  
> AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
>  $3), 'c0')), 'c1'), 0)), 
> rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
>  $2),joinType=left)]
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
>   at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>   at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>   at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
>   at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
>   at 
> org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
>   testFilterRule1(FilterRuleTest.scala:63)
> Caused by: java.lang.NullPointerException
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
>   at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
>   at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
>   at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
>   at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
>   at 
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
>   at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
> val util = batchTestUtil()
> val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
> val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
> val results = t1
>   .leftOuterJoin(t2, 'b === 'e)
>   .select('c, Merger('c, 'f) as 'c0)
>   .select(Merger('c, 'c0) as 'c1)
>   .where('c1 >= 0)
> val expected = unaryNode(
>   "DataSetCalc",
>   binaryNode(
> "DataSetJoin",
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(0),
>   term("select", "b", "c")
> ),
> unaryNode(
>   "DataSetCalc",
>   batchTableNode(1),
>   term("select", "e", "f")
> ),
> term("where", "=(b, e)"),
> term("join", "b", "c", "e", "f"),
> term("joinType", "LeftOuterJoin")
>   ),
>   term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>   term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
> )
> util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
> val util = batchTestUtil()
> util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
> util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
> util.tableEnv.registerFunction("udf_test", Merger)
> val sql =
>   s"""
>  |select c1
>  |from (
>  |  select udf_test(c, c0) as c1
>  |  from (
>  |select c, udf_test(b, c) as c0
>  |  from
>  |  (select a, b, c
>  |from T1
>  |left outer join T2
>  |on T1.b = T2.e
>  |  ) tmp
>  |  ) tmp1
>  |) tmp2
>  |where c1 >= 0
>""".stripMargin
> val results = util.tableEnv.sqlQuery(sql)
> val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) 
> \n" +
>   "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
> joinType=[LeftOuterJoin])\n" +
>   "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
> 0)])\n" +
>   "DataSetScan(table=[[_DataSetTable_0]])\n" +
>   "DataSetCalc(select=[e])\n" +
>   "DataSetScan(table=[[_DataSetTable_1]])"
> util.verifyTable(results, expected)
>   }
> }
> object Merger extends ScalarFunction {
> 

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-09-21 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r219411313
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -175,13 +173,41 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
 val rewrittenTemporalJoin = temporalJoin.copy(temporalJoin.getTraitSet, 
List(left, right))
 
-// Materialize all of the time attributes from the right side of temporal 
join
-val indicesToMaterialize =
-  (left.getRowType.getFieldCount until 
rewrittenTemporalJoin.getRowType.getFieldCount).toSet
+val indicesToMaterialize = 
gatherIndicesToMaterialize(rewrittenTemporalJoin, left, right)
 
 materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  private def gatherIndicesToMaterialize(
+  temporalJoin: Join,
+  left: RelNode,
+  right: RelNode)
+: Set[Int] = {
+
+// Materialize all of the time attributes from the right side of temporal 
join
+var indicesToMaterialize =
+  (left.getRowType.getFieldCount until 
temporalJoin.getRowType.getFieldCount).toSet
+
+if (!hasRowtimeAttribute(right.getRowType)) {
 
 Review comment:
   No, I forgot. Adding one now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread GitBox
tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219416411
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -275,6 +340,10 @@ public static SSLContext 
createRestClientSSLContext(Configuration config) throws
return null;
}
 
+   if (isRestSSLAuthenticationEnabled(config)) {
+   return createRestAuthenticationSSLContext(config);
+   }
+
 
 Review comment:
   The three methods `createRestClientSSLContext`, `createRestServerSSLContext` 
and `createRestAuthenticationSSLContext` seem to share a lot of common code. I 
would suggest to refactor them. E.g. one could have a 
   ```
   internalCreateRestSSLContext(configurationMode) {
   if (configurationMode & server) {
   keystore
   } else {
   null
   }
   
   if (configurationMode & client) {
truststore
   } else {
null
   }
   }
   ```
   
   with `ConfigurationMode = {clientSide = 1, serverSide = 2, mutual = 
clientSide | serverSide}`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623257#comment-16623257
 ] 

ASF GitHub Bot commented on FLINK-10371:


tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219416570
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
 ##
 @@ -78,6 +78,29 @@ public void checkEnableSSL() {
assertFalse(SSLUtils.isRestSSLEnabled(precedence));
}
 
+   /**
+* Tests whether activation of REST mutual SSL authentication evaluates 
the config flags correctly.
+*/
+   @SuppressWarnings("deprecation")
 
 Review comment:
   Which method is `deprecated`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> 
>
> Key: FLINK-10371
> URL: https://issues.apache.org/jira/browse/FLINK-10371
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, REST, Security
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Johannes Dillmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal 
> connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to 
> encryption of the connection and verification of the Flink REST endpoint from 
> the client side.
> But _mutual authentication between the REST endpoint and clients is not 
> supported yet_.
>  The [documentation suggests 
> |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html]
>  using a side car proxy to enable SSL mutual auth on the REST endpoint and 
> points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for 
> support of  simple mutual authentication directly in Flink: Mainly support 
> for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) 
> and act as gateways to the Flink REST endpoint and the Flink web interface. 
> To prevent unauthorised access to Flink the connectivity has to be secured. 
> As the tools acts as gateway it is easy to create and pass a shared keystore  
> and truststore used for mutual authentication to the Flink instances 
> configurations.
> To enable for SSL mutual authentication on REST endpoints, I am suggesting to 
> add a the configuration parameter `security.ssl.rest.authentication-enabled` 
> which defaults to `false`.
>  If it is set to `true` the `SSLUtils` factories for creating the REST server 
> endpoint and the REST clients should set authentication to required and share 
> `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL 
> mutual authenticated connections.
>  
> I have a working prototype which I would gladly submit as a PR to get further 
> feedback. The changes to Flink are minimal and the default behaviour won't 
> change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread GitBox
tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219417270
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
 ##
 @@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test validates that connections are failing when mutual auth is 
enabled but untrusted
+ * keys are used.
+ */
+public class RestServerSSLAuthITCase extends TestLogger {
+
+   private static final String KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.keystore").getFile();
+   private static final String TRUST_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.truststore").getFile();
+   private static final String UNTRUSTED_KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/untrusted.keystore").getFile();
+
+   private static final Time timeout = Time.seconds(10L);
+
+   @Test
+   public void testConnectFailure() throws Exception {
+   RestClient restClient = null;
+   RestServerEndpoint serverEndpoint = null;
+
+   try {
+   final Configuration baseConfig = new Configuration();
+   baseConfig.setInteger(RestOptions.PORT, 0);
+   baseConfig.setString(RestOptions.ADDRESS, "localhost");
+   baseConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
true);
+   
baseConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+   baseConfig.setString(SecurityOptions.SSL_ALGORITHMS, 
"TLS_RSA_WITH_AES_128_CBC_SHA");
+
+   Configuration serverConfig = new 
Configuration(baseConfig);
+   
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, TRUST_STORE_FILE);
+   
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+   Configuration clientConfig = new 
Configuration(baseConfig);
+   
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, 
UNTRUSTED_KEY_STORE_FILE);
+   
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+   
clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+   
clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+   

[GitHub] tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread GitBox
tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219416570
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
 ##
 @@ -78,6 +78,29 @@ public void checkEnableSSL() {
assertFalse(SSLUtils.isRestSSLEnabled(precedence));
}
 
+   /**
+* Tests whether activation of REST mutual SSL authentication evaluates 
the config flags correctly.
+*/
+   @SuppressWarnings("deprecation")
 
 Review comment:
   Which method is `deprecated`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623258#comment-16623258
 ] 

ASF GitHub Bot commented on FLINK-10371:


tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219416411
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -275,6 +340,10 @@ public static SSLContext 
createRestClientSSLContext(Configuration config) throws
return null;
}
 
+   if (isRestSSLAuthenticationEnabled(config)) {
+   return createRestAuthenticationSSLContext(config);
+   }
+
 
 Review comment:
   The three methods `createRestClientSSLContext`, `createRestServerSSLContext` 
and `createRestAuthenticationSSLContext` seem to share a lot of common code. I 
would suggest to refactor them. E.g. one could have a 
   ```
   internalCreateRestSSLContext(configurationMode) {
   if (configurationMode & server) {
   keystore
   } else {
   null
   }
   
   if (configurationMode & client) {
truststore
   } else {
null
   }
   }
   ```
   
   with `ConfigurationMode = {clientSide = 1, serverSide = 2, mutual = 
clientSide | serverSide}`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> 
>
> Key: FLINK-10371
> URL: https://issues.apache.org/jira/browse/FLINK-10371
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, REST, Security
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Johannes Dillmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal 
> connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to 
> encryption of the connection and verification of the Flink REST endpoint from 
> the client side.
> But _mutual authentication between the REST endpoint and clients is not 
> supported yet_.
>  The [documentation suggests 
> |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html]
>  using a side car proxy to enable SSL mutual auth on the REST endpoint and 
> points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for 
> support of  simple mutual authentication directly in Flink: Mainly support 
> for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) 
> and act as gateways to the Flink REST endpoint and the Flink web interface. 
> To prevent unauthorised access to Flink the connectivity has to be secured. 
> As the tools acts as gateway it is easy to create and pass a shared keystore  
> and truststore used for mutual authentication to the Flink instances 
> configurations.
> To enable for SSL mutual authentication on REST endpoints, I am suggesting to 
> add a the configuration parameter `security.ssl.rest.authentication-enabled` 
> which defaults to `false`.
>  If it is set to `true` the `SSLUtils` factories for creating the REST server 
> endpoint and the REST clients should set authentication to required and share 
> `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL 
> mutual authenticated connections.
>  
> I have a working prototype which I would gladly submit as a PR to get further 
> feedback. The changes to Flink are minimal and the default behaviour won't 
> change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623255#comment-16623255
 ] 

ASF GitHub Bot commented on FLINK-10371:


tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219417270
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
 ##
 @@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test validates that connections are failing when mutual auth is 
enabled but untrusted
+ * keys are used.
+ */
+public class RestServerSSLAuthITCase extends TestLogger {
+
+   private static final String KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.keystore").getFile();
+   private static final String TRUST_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.truststore").getFile();
+   private static final String UNTRUSTED_KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/untrusted.keystore").getFile();
+
+   private static final Time timeout = Time.seconds(10L);
+
+   @Test
+   public void testConnectFailure() throws Exception {
+   RestClient restClient = null;
+   RestServerEndpoint serverEndpoint = null;
+
+   try {
+   final Configuration baseConfig = new Configuration();
+   baseConfig.setInteger(RestOptions.PORT, 0);
+   baseConfig.setString(RestOptions.ADDRESS, "localhost");
+   baseConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
true);
+   
baseConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+   baseConfig.setString(SecurityOptions.SSL_ALGORITHMS, 
"TLS_RSA_WITH_AES_128_CBC_SHA");
+
+   Configuration serverConfig = new 
Configuration(baseConfig);
+   
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, TRUST_STORE_FILE);
+   
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+   Configuration clientConfig = new 
Configuration(baseConfig);
+   
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, 
UNTRUSTED_KEY_STORE_FILE);
+   
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 

[GitHub] tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread GitBox
tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219416858
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
 ##
 @@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test validates that connections are failing when mutual auth is 
enabled but untrusted
+ * keys are used.
+ */
+public class RestServerSSLAuthITCase extends TestLogger {
 
 Review comment:
   Nice that the test case extends `TestLogger`  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread GitBox
tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219417533
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
 ##
 @@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test validates that connections are failing when mutual auth is 
enabled but untrusted
+ * keys are used.
+ */
+public class RestServerSSLAuthITCase extends TestLogger {
+
+   private static final String KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.keystore").getFile();
+   private static final String TRUST_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/local127.truststore").getFile();
+   private static final String UNTRUSTED_KEY_STORE_FILE = 
RestServerSSLAuthITCase.class.getResource("/untrusted.keystore").getFile();
+
+   private static final Time timeout = Time.seconds(10L);
+
+   @Test
+   public void testConnectFailure() throws Exception {
+   RestClient restClient = null;
+   RestServerEndpoint serverEndpoint = null;
+
+   try {
+   final Configuration baseConfig = new Configuration();
+   baseConfig.setInteger(RestOptions.PORT, 0);
+   baseConfig.setString(RestOptions.ADDRESS, "localhost");
+   baseConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, 
true);
+   
baseConfig.setBoolean(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
+   baseConfig.setString(SecurityOptions.SSL_ALGORITHMS, 
"TLS_RSA_WITH_AES_128_CBC_SHA");
+
+   Configuration serverConfig = new 
Configuration(baseConfig);
+   
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, TRUST_STORE_FILE);
+   
serverConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+   
serverConfig.setString(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
+
+   Configuration clientConfig = new 
Configuration(baseConfig);
+   
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, 
UNTRUSTED_KEY_STORE_FILE);
+   
clientConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, 
"password");
+   
clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, KEY_STORE_FILE);
+   
clientConfig.setString(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
+   

[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623254#comment-16623254
 ] 

ASF GitHub Bot commented on FLINK-10371:


tillrohrmann commented on a change in pull request #6727: [FLINK-10371] Allow 
to enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r219416858
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerSSLAuthITCase.java
 ##
 @@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLHandshakeException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test validates that connections are failing when mutual auth is 
enabled but untrusted
+ * keys are used.
+ */
+public class RestServerSSLAuthITCase extends TestLogger {
 
 Review comment:
   Nice that the test case extends `TestLogger`  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> 
>
> Key: FLINK-10371
> URL: https://issues.apache.org/jira/browse/FLINK-10371
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, REST, Security
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Johannes Dillmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal 
> connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to 
> encryption of the connection and verification of the Flink REST endpoint from 
> the client side.
> But _mutual authentication between the REST endpoint and clients is not 
> supported yet_.
>  The [documentation suggests 
> |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html]
>  using a side car proxy to enable SSL mutual auth on the REST endpoint and 
> points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for 
> support of  simple mutual authentication directly in Flink: Mainly support 
> for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) 
> and act as gateways to the Flink REST endpoint and the Flink web interface. 
> To prevent unauthorised access to Flink the connectivity has to be secured. 
> As the tools acts as gateway it is easy to create and pass a 

[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623253#comment-16623253
 ] 

ASF GitHub Bot commented on FLINK-10074:


yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423453821
 
 
   hi @tillrohrmann what do you think about the latest implementation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10344) Rethink SubmittedJobGraphListener

2018-09-21 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-10344:
-
Attachment: (was: Tornado.pdf)

> Rethink SubmittedJobGraphListener
> -
>
> Key: FLINK-10344
> URL: https://issues.apache.org/jira/browse/FLINK-10344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
> Attachments: Tornado.pdf
>
>
> The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
> return false positives. This is obviously problematic, because it causes the 
> subsequent recovery operation to fail. Ideally we would not require the 
> {{SubmittedJobGraphListener}}. One could, for example, periodically check 
> from the main thread whether there are new jobs. That way we would know which 
> jobs are currently running and which are being cleaned up. 
> Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623270#comment-16623270
 ] 

ASF GitHub Bot commented on FLINK-9891:
---

azagrebin commented on a change in pull request #6718: [FLINK-9891] Add 
optional hook to shutdown cluster if a session was created in per-job mode in 
attached mode
URL: https://github.com/apache/flink/pull/6718#discussion_r219421119
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
 ##
 @@ -492,11 +505,10 @@ private static void 
printJobStatusMessages(List jobs) {
jobsByState.entrySet().stream()
.sorted(statusComparator)

.map(Map.Entry::getValue).flatMap(List::stream).sorted(startTimeComparator)
-   .forEachOrdered(job -> {
-   System.out.println(dateFormat.format(new 
Date(job.getStartTime()))
-   + " : " + job.getJobId() + " : " + 
job.getJobName()
-   + " (" + job.getJobState() + ")");
-   });
+   .forEachOrdered(job ->
 
 Review comment:
   yes, redundant brackets in one-line lambda


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> 

[jira] [Commented] (FLINK-10344) Rethink SubmittedJobGraphListener

2018-09-21 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623273#comment-16623273
 ] 

Biao Liu commented on FLINK-10344:
--

[~dangdangdang] 
Oh that's my fault. I was reading a paper. Dragged the doc into chrome. I think 
that caused the attachment problem.
I have deleted the attachment. Sorry for that.

> Rethink SubmittedJobGraphListener
> -
>
> Key: FLINK-10344
> URL: https://issues.apache.org/jira/browse/FLINK-10344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
> return false positives. This is obviously problematic, because it causes the 
> subsequent recovery operation to fail. Ideally we would not require the 
> {{SubmittedJobGraphListener}}. One could, for example, periodically check 
> from the main thread whether there are new jobs. That way we would know which 
> jobs are currently running and which are being cleaned up. 
> Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623331#comment-16623331
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

yanghua commented on issue #6266: [FLINK-9682] Add setDescription to execution 
environment and provide description field for the rest api
URL: https://github.com/apache/flink/pull/6266#issuecomment-423475109
 
 
   @dawidwys any review suggestion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10312) Wrong / missing exception when submitting job

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10312:
---
Labels: pull-request-available  (was: )

> Wrong / missing exception when submitting job
> -
>
> Key: FLINK-10312
> URL: https://issues.apache.org/jira/browse/FLINK-10312
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Stephan Ewen
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
> Attachments: lmerge-TR.pdf
>
>
> h3. Problem
> When submitting a job that cannot be created / initialized on the JobManager, 
> there is no proper error message. The exception says *"Could not retrieve the 
> execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)"*
> h3. Steps to Reproduce
> Create a streaming job, set a state backend with a non existing file system 
> scheme.
> h3. Full Stack Trace
> {code}
> Submitting a job where instantiation on the JM fails yields this, which seems 
> like a major regression from seeing the actual exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>   at 
> com.dataartisans.streamledger.examples.simpletrade.SimpleTradeExample.main(SimpleTradeExample.java:98)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1120)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$25(RestClusterClient.java:379)
>   at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>   at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$32(FutureUtils.java:213)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>   at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>   at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> 

[GitHub] yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint failures

2018-09-21 Thread GitBox
yanghua commented on issue #6567: [FLINK-10074] Allowable number of checkpoint 
failures
URL: https://github.com/apache/flink/pull/6567#issuecomment-423453821
 
 
   hi @tillrohrmann what do you think about the latest implementation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader

2018-09-21 Thread GitBox
tillrohrmann commented on issue #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-423455756
 
 
   I think it should not be possible to have two async recovery operations 
ongoing since either of them will have to wait for the other to complete. That 
was the idea of the fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623264#comment-16623264
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on issue #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-423455756
 
 
   I think it should not be possible to have two async recovery operations 
ongoing since either of them will have to wait for the other to complete. That 
was the idea of the fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client

2018-09-21 Thread GitBox
twalthr commented on issue #6725: [FLINK-10263] [sql-client] Fix classloader 
issues in SQL Client
URL: https://github.com/apache/flink/pull/6725#issuecomment-423486904
 
 
   Thank you @dawidwys. I simplified the code there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10354) Savepoints should be counted as retained checkpoints

2018-09-21 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623403#comment-16623403
 ] 

Till Rohrmann commented on FLINK-10354:
---

This looks like a duplicate of FLINK-9983?

> Savepoints should be counted as retained checkpoints
> 
>
> Key: FLINK-10354
> URL: https://issues.apache.org/jira/browse/FLINK-10354
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This task is about reverting [FLINK-6328].
> The problem is that you can get incorrect results with exactly-once sinks if 
> there is a failure after taking a savepoint but before taking the next 
> checkpoint because the savepoint will also have manifested side effects to 
> the sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6721: [FLINK-10376] [tests] Replace mkdirTolerateExisting by thread safe Files#createDirectories

2018-09-21 Thread GitBox
TisonKun commented on issue #6721: [FLINK-10376] [tests] Replace 
mkdirTolerateExisting by thread safe Files#createDirectories
URL: https://github.com/apache/flink/pull/6721#issuecomment-423456515
 
 
   travis gives green and the unstable one is theoretically hardened. 
@tillrohrmann  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10344) Rethink SubmittedJobGraphListener

2018-09-21 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-10344:
-
Attachment: (was: Tornado.pdf)

> Rethink SubmittedJobGraphListener
> -
>
> Key: FLINK-10344
> URL: https://issues.apache.org/jira/browse/FLINK-10344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{SubmittedJobGraphListener}} in {{ZooKeeperSubmittedJobGraphStore}} can 
> return false positives. This is obviously problematic, because it causes the 
> subsequent recovery operation to fail. Ideally we would not require the 
> {{SubmittedJobGraphListener}}. One could, for example, periodically check 
> from the main thread whether there are new jobs. That way we would know which 
> jobs are currently running and which are being cleaned up. 
> Alternatively it is necessary to tolerate false positives :-(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10312) Wrong / missing exception when submitting job

2018-09-21 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623266#comment-16623266
 ] 

Andrey Zagrebin edited comment on FLINK-10312 at 9/21/18 8:42 AM:
--

PR suggests the following embedding of server side failure:
{code:java}

{code}
Example:
{code:java}
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: ed85deec64effb201fa00401e2ead30b)
 at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
 at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
 at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:805)
 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:281)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1045)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1121)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1121)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
 at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379)
 at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
 at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
 at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Exception is not retryable.
 at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
 at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
 at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
 at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
 ... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Exception is not retryable.
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:215)
 ... 9 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job 
submission failed.,
(JobManagerRunner.java:176)
 at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1049)
 at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:304)
 at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
 ... 7 more
Caused by: java.lang.RuntimeException: Failed to start 

[jira] [Commented] (FLINK-10363) S3 FileSystem factory prints secrets into logs

2018-09-21 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623330#comment-16623330
 ] 

Steve Loughran commented on FLINK-10363:


see WHIRR-642 for this same issue; it's easy to do. For that one I had to 
google for every whirr log entry & notify at least two people they'd 
accidentally shared their secrets. Luckily that was the era before bitcoin 
miners scanned the internet for AWS keys

> S3 FileSystem factory prints secrets into logs
> --
>
> Key: FLINK-10363
> URL: https://issues.apache.org/jira/browse/FLINK-10363
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Critical
> Fix For: 1.7.0, 1.6.2
>
>
> The file system factory logs all values it applies from the flink 
> configuration.
> That frequently includes access keys, which should not leak into logs.
> The loader should only log the keys, not the values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10312) Wrong / missing exception when submitting job

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623261#comment-16623261
 ] 

ASF GitHub Bot commented on FLINK-10312:


azagrebin opened a new pull request #6731: [FLINK-10312] Propagate exception 
from server to client in REST API
URL: https://github.com/apache/flink/pull/6731
 
 
   ## What is the purpose of the change
   
   If exception currently happens on the server side in REST API handlers, the 
client side gets error response. The error response contains only an abstract 
message of the server side exception in the error list and no details. This PR 
adds also a stringified version of the stack trace of the server side 
exception. This way the stack trace in logs on the client side will also 
contain the stack trace of the server side with more details about actual 
failure.
   
   ## Brief change log
   
 - pack the stringified version of the exception stack trace into the error 
list of ErrorResponseBody in AbstractRestHandler.processRestHandlerException
 - strip CompletionException in completeExceptionally case of 
FutureUtils.retryOperationWithDelay to avoid double logging of underlying 
exception
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Wrong / missing exception when submitting job
> -
>
> Key: FLINK-10312
> URL: https://issues.apache.org/jira/browse/FLINK-10312
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Stephan Ewen
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
> Attachments: lmerge-TR.pdf
>
>
> h3. Problem
> When submitting a job that cannot be created / initialized on the JobManager, 
> there is no proper error message. The exception says *"Could not retrieve the 
> execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)"*
> h3. Steps to Reproduce
> Create a streaming job, set a state backend with a non existing file system 
> scheme.
> h3. Full Stack Trace
> {code}
> Submitting a job where instantiation on the JM fails yields this, which seems 
> like a major regression from seeing the actual exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>   at 
> com.dataartisans.streamledger.examples.simpletrade.SimpleTradeExample.main(SimpleTradeExample.java:98)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
>   at 

[GitHub] azagrebin opened a new pull request #6731: [FLINK-10312] Propagate exception from server to client in REST API

2018-09-21 Thread GitBox
azagrebin opened a new pull request #6731: [FLINK-10312] Propagate exception 
from server to client in REST API
URL: https://github.com/apache/flink/pull/6731
 
 
   ## What is the purpose of the change
   
   If exception currently happens on the server side in REST API handlers, the 
client side gets error response. The error response contains only an abstract 
message of the server side exception in the error list and no details. This PR 
adds also a stringified version of the stack trace of the server side 
exception. This way the stack trace in logs on the client side will also 
contain the stack trace of the server side with more details about actual 
failure.
   
   ## Brief change log
   
 - pack the stringified version of the exception stack trace into the error 
list of ErrorResponseBody in AbstractRestHandler.processRestHandlerException
 - strip CompletionException in completeExceptionally case of 
FutureUtils.retryOperationWithDelay to avoid double logging of underlying 
exception
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10312) Wrong / missing exception when submitting job

2018-09-21 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623266#comment-16623266
 ] 

Andrey Zagrebin commented on FLINK-10312:
-

PR suggests the following embedding of server side failure:
{code}
]
{code}
Example:
{code}
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: ed85deec64effb201fa00401e2ead30b)
 at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
 at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
 at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:805)
 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:281)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1045)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1121)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1121)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
 at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379)
 at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
 at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
 at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Exception is not retryable.
 at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
 at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
 at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
 at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
 ... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Exception is not retryable.
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:215)
 ... 9 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job 
submission failed.,
(JobManagerRunner.java:176)
 at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1049)
 at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:304)
 at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
 ... 7 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: 
Incomplete HDFS URI, no host: 

[jira] [Commented] (FLINK-10376) BlobCacheCleanupTest.testPermanentBlobCleanup failed on Travis

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623265#comment-16623265
 ] 

ASF GitHub Bot commented on FLINK-10376:


TisonKun commented on issue #6721: [FLINK-10376] [tests] Replace 
mkdirTolerateExisting by thread safe Files#createDirectories
URL: https://github.com/apache/flink/pull/6721#issuecomment-423456515
 
 
   travis gives green and the unstable one is theoretically hardened. 
@tillrohrmann  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> BlobCacheCleanupTest.testPermanentBlobCleanup failed on Travis
> --
>
> Key: FLINK-10376
> URL: https://issues.apache.org/jira/browse/FLINK-10376
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: 陈梓立
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.7.0
>
>
> The {{BlobCacheCleanupTest.testPermanentBlobCleanup}} failed on Travis with 
> the following exception:
> {code}
> testPermanentBlobCleanup(org.apache.flink.runtime.blob.BlobCacheCleanupTest)  
> Time elapsed: 1.021 sec  <<< ERROR!
> java.io.IOException: Cannot create directory 
> '/tmp/junit6933344779576098111/junit6230481778643276963/blobStore-77f235ff-1721-4c30-9bec-db4004fe8859/job_9ca4b9530b367af6f554dad6458ca3ad'.
>   at 
> org.apache.flink.runtime.blob.BlobUtils.mkdirTolerateExisting(BlobUtils.java:214)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.getStorageLocation(BlobUtils.java:237)
>   at 
> org.apache.flink.runtime.blob.PermanentBlobCache.getStorageLocation(PermanentBlobCache.java:222)
>   at 
> org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFilesExist(BlobServerCleanupTest.java:213)
>   at 
> org.apache.flink.runtime.blob.BlobCacheCleanupTest.verifyJobCleanup(BlobCacheCleanupTest.java:432)
>   at 
> org.apache.flink.runtime.blob.BlobCacheCleanupTest.testPermanentBlobCleanup(BlobCacheCleanupTest.java:133)
> {code}
> https://api.travis-ci.org/v3/job/430910115/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10260) Confusing log during TaskManager registration

2018-09-21 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10260.
---
Resolution: Fixed

Fixed via
1.7.0: 542e8cc22290984b4b2e32577430ceb82dd82fde
1.6.2: 23221f97f3462031fa5374803df1a1d57634ff8d
1.5.5: 8dee8625502a828a303b8ec4180ab00b7e794334

> Confusing log during TaskManager registration
> -
>
> Key: FLINK-10260
> URL: https://issues.apache.org/jira/browse/FLINK-10260
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Stephan Ewen
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> During startup, when TaskManagers register, I see a lot of confusing log 
> lines.
> The below case happened during startup of a cloud setup where TaskManagers 
> took a varying amount of time to start and might have started before the 
> JobManager
> {code}
> -- Logs begin at Thu 2018-08-30 14:51:58 UTC, end at Thu 2018-08-30 14:55:39 
> UTC. --
> Aug 30 14:52:52 flink-belgium-1 systemd[1]: Started flink-jobmanager.service.
> -- Subject: Unit flink-jobmanager.service has finished start-up
> -- Defined-By: systemd
> -- Support: http://www.ubuntu.com/support
> -- 
> -- Unit flink-jobmanager.service has finished starting up.
> -- 
> -- The start-up result is RESULT.
> Aug 30 14:52:52 flink-belgium-1 jobmanager.sh[5416]: used deprecated key 
> `jobmanager.heap.mb`, please replace with key `jobmanager.heap.size`
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: Starting 
> standalonesession as a console application on host flink-belgium-1.
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,221 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,222 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> Starting StandaloneSessionClusterEntrypoint (Version: 1.6.0, Rev:ff472b4, 
> Date:07.08.2018 @ 13:31:13 UTC)
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,222 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  OS 
> current user: flink
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,718 
> WARN  org.apache.hadoop.util.NativeCodeLoader   - Unable 
> to load native-hadoop library for your platform... using builtin-java classes 
> where applicable
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,847 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> Current Hadoop/Kerberos user: flink
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,848 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM: 
> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,848 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> Maximum heap size: 1963 MiBytes
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,848 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> JAVA_HOME: (not set)
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop 
> version: 2.8.3
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM 
> Options:
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> -Xms2048m
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> -Xmx2048m
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,853 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,854 
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  
> Program Arguments:
> Aug 30 14:52:53 flink-belgium-1 jobmanager.sh[5416]: 2018-08-30 14:52:53,854 
> 

[jira] [Resolved] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-09-21 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9567.
--
   Resolution: Fixed
Fix Version/s: (was: 1.5.1)
   (was: 1.6.0)
   1.5.5
   1.6.2
   1.7.0

Fixed via
1.7.0: 0cf776be4483e7f939de0a7f2b1fe3263a14c6fb
1.6.2: 806d3a424c0ad66f49282d0496599597d0f9f0c0
1.5.5: 69cae4f6daf6580ca83c452c7c3ad33489551a36

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10375) ExceptionInChainedStubException hides wrapped exception in cause

2018-09-21 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10375.
---
   Resolution: Fixed
Fix Version/s: 1.5.5
   1.6.2
   1.7.0

Fixed via
1.7.0: f1511a3602690f2c44d4feb78b8961e31d345326
1.6.2: 4529e377eb23155f53fe1c1dd86830098313bc4f
1.5.5: c94b125d1ae143d92cadc93d388703f58e723bae

> ExceptionInChainedStubException hides wrapped exception in cause
> 
>
> Key: FLINK-10375
> URL: https://issues.apache.org/jira/browse/FLINK-10375
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Mike Pedersen
>Assignee: Mike Pedersen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> ExceptionInChainedStubException does not have the wrapped exception as the 
> cause. This creates generally unhelpful exception traces like this:
> {code:java}
> org.apache.beam.sdk.util.UserCodeException: 
> org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
>   at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>   at 
> org.apache.beam.sdk.io.WriteFiles$ApplyShardingKeyFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
>   at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:66)
>   at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:120)
>   at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:149)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
>   at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
>   at 
> org.apache.beam.sdk.io.WriteFiles$ApplyShardingKeyFn.processElement(WriteFiles.java:686)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10387) StateBackend create methods should return interface not abstract KeyedStateBackend classes

2018-09-21 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-10387:
--

 Summary: StateBackend create methods should return interface not 
abstract KeyedStateBackend classes
 Key: FLINK-10387
 URL: https://issues.apache.org/jira/browse/FLINK-10387
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Gyula Fora


Currently the createKeyedStateBackend(...) methods return 
AbstractKeyedStateBackend instead of an interface.

This makes it virtually impossible to write nice extensions to StateBackends 
that add additional functionality to existing backends while delegating other 
method calls.

It should be easy enough to add a new interface that extends everything that 
the AbstractKeyedStateBackend does and the method should return that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10310:
---
Labels: pull-request-available  (was: )

> Cassandra Sink - Handling failing requests
> --
>
> Key: FLINK-10310
> URL: https://issues.apache.org/jira/browse/FLINK-10310
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jayant Ameta
>Priority: Major
>  Labels: pull-request-available
>
> The cassandra sink fails for any kind of error. For some exceptions (e.g 
> WriteTimeoutException), ignoring the exception may be acceptable as well.
> Can we discuss having a FailureHandler on the lines of 
> ActionRequestFailureHandler?
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10310) Cassandra Sink - Handling failing requests

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623422#comment-16623422
 ] 

ASF GitHub Bot commented on FLINK-10310:


wittyameta opened a new pull request #6732: [FLINK-10310] Cassandra Sink - 
Handling failing requests.
URL: https://github.com/apache/flink/pull/6732
 
 
   ## What is the purpose of the change
   
   This pull request provides support to optionally handle cassandra sink 
errors. A user may choose to ignore an exception instead of allowing the sink 
to fail. The handler is similar to `ActionRequestFailureHandler` in Elastic 
Sink.
   
   
   ## Brief change log
   
 - *Added interface `CassandraFailureHandler` and a no-op implementation 
`NoOpCassandraFailureHandler`*
 - *`CassandraSinkBase` has a new field `failureHandler` and updated 
constructor*
 - *`checkAsyncErrors` method in `CassandraSinkBase` calls the 
`failureHandler` instead of throwing an `IOException`*
 - *`CassandraSinkBuilder` has a new optional field `failureHandler` and 
updated setter. Uses no-op implementation as default*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added test in class `CassandraSinkBaseTest` that validates that 
failureHandler is called and the error is ignored*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **yes**
 - If yes, how is the feature documented? **docs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cassandra Sink - Handling failing requests
> --
>
> Key: FLINK-10310
> URL: https://issues.apache.org/jira/browse/FLINK-10310
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jayant Ameta
>Priority: Major
>  Labels: pull-request-available
>
> The cassandra sink fails for any kind of error. For some exceptions (e.g 
> WriteTimeoutException), ignoring the exception may be acceptable as well.
> Can we discuss having a FailureHandler on the lines of 
> ActionRequestFailureHandler?
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WriteTimeoutException-in-Cassandra-sink-kill-the-job-tp22706p22707.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8803) Mini Cluster Shutdown with HA unstable, causing test failures

2018-09-21 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623459#comment-16623459
 ] 

tison commented on FLINK-8803:
--

is it "won't fix" since it's all about {{FlinkMiniCluster}} which based on 
legacy mode?

> Mini Cluster Shutdown with HA unstable, causing test failures
> -
>
> Key: FLINK-8803
> URL: https://issues.apache.org/jira/browse/FLINK-8803
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Stephan Ewen
>Priority: Critical
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> When the {{FlinkMiniCluster}} is created for HA tests with ZooKeeper, the 
> shutdown is unstable.
> It looks like ZooKeeper may be shut down before the JobManager is shut down, 
> causing the shutdown procedure of the JobManager (specifically 
> {{ZooKeeperSubmittedJobGraphStore.removeJobGraph}}) to block until tests time 
> out.
> Full log: https://api.travis-ci.org/v3/job/346853707/log.txt
> Note that no ZK threads are alive any more, seems ZK is shut down already.
> Relevant Stack Traces:
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7f973800a800 nid=0x43b4 waiting on 
> condition [0x7f973eb0b000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x8966cf18> (a 
> scala.concurrent.impl.Promise$CompletionLatch)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
>   at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
>   at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
>   at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>   at scala.concurrent.Await$.ready(package.scala:169)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startInternalShutdown(FlinkMiniCluster.scala:469)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.stop(FlinkMiniCluster.scala:435)
>   at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.closeAsync(FlinkMiniCluster.scala:719)
>   at 
> org.apache.flink.test.util.MiniClusterResource.after(MiniClusterResource.java:104)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)
> ...
> {code}
> {code}
> "flink-akka.actor.default-dispatcher-2" #1012 prio=5 os_prio=0 
> tid=0x7f97394fa800 nid=0x3328 waiting on condition [0x7f971db29000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x87f82a70> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.internalBlockUntilConnectedOrTimedOut(CuratorZookeeperClient.java:336)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:241)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:225)
>   at 
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:35)
>   at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.release(ZooKeeperStateHandleStore.java:478)
>   at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.releaseAndTryRemove(ZooKeeperStateHandleStore.java:435)
>   at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.releaseAndTryRemove(ZooKeeperStateHandleStore.java:405)
>   at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.removeJobGraph(ZooKeeperSubmittedJobGraphStore.java:266)
>   - locked <0x807f4258> (a 

[jira] [Commented] (FLINK-10354) Savepoints should be counted as retained checkpoints

2018-09-21 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623465#comment-16623465
 ] 

Dawid Wysakowicz commented on FLINK-10354:
--

You're right, haven't seen the other one. Will close the other one as this have 
PR opened already.

> Savepoints should be counted as retained checkpoints
> 
>
> Key: FLINK-10354
> URL: https://issues.apache.org/jira/browse/FLINK-10354
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This task is about reverting [FLINK-6328].
> The problem is that you can get incorrect results with exactly-once sinks if 
> there is a failure after taking a savepoint but before taking the next 
> checkpoint because the savepoint will also have manifested side effects to 
> the sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9983) Savepoints should count as checkpoints when recovering

2018-09-21 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-9983.
---
Resolution: Duplicate

Will be fixed in [FLINK-10354]

> Savepoints should count as checkpoints when recovering
> --
>
> Key: FLINK-9983
> URL: https://issues.apache.org/jira/browse/FLINK-9983
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Priority: Critical
> Fix For: 1.7.0
>
>
> If they are not used when recovering you can get intro problems with 
> duplicate output data when a failure occurs after a savepoint was taken but 
> before the next checkpoint occurs.
> The fix, in the long run, is to differentiate between savepoints that have 
> side effects and those that don't. The former would be used for a 
> "stop-with-savepoint" scenario while the latter is for in-between savepoints. 
> This is harder to implement, so I vote for the easy fix described for now for 
> fixing the duplication problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10379) Can not use Table Functions in Java Table API

2018-09-21 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623471#comment-16623471
 ] 

Piotr Nowojski commented on FLINK-10379:


Thanks [~hequn8128] :)

Probably we should first also think about how to solve this. Do you have some 
specific solution in mind?

 I wasn't taking part of the discussion/effort that resulted in removing 
{{Table#join(String)}} method, but there were probably some reasons behind 
that. I heard that part of it was to switch to using implicit conversions of 
table functions to {{Table}} instances in Scala. Obviously that doesn't work, 
but maybe we could provide something to explicitly apply argument to 
{{TableFunction}} instance and convert it to {{Table}}? Something along the 
lines:

 
{code:java}
// Register the function.
TableFunction split = tableEnv.registerFunction("split", new Split("#"));

myTable.join(split.apply("a").as("word, length"));
{code}
(But I'm pretty unfamiliar with Table API, so treat this with grain of salt)

What do you think?

 

> Can not use Table Functions in Java Table API
> -
>
> Key: FLINK-10379
> URL: https://issues.apache.org/jira/browse/FLINK-10379
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Piotr Nowojski
>Assignee: Hequn Cheng
>Priority: Critical
>
> As stated in the 
> [documentation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#table-functions],]
>  this is how table functions should be used in Java Table API:
> {code:java}
> // Register the function.
> tableEnv.registerFunction("split", new Split("#"));
> myTable.join("split(a) as (word, length)");
> {code}
> However {{Table.join(String)}} was removed sometime ago and now it is 
> impossible to use Table Functions in Java Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wittyameta opened a new pull request #6732: [FLINK-10310] Cassandra Sink - Handling failing requests.

2018-09-21 Thread GitBox
wittyameta opened a new pull request #6732: [FLINK-10310] Cassandra Sink - 
Handling failing requests.
URL: https://github.com/apache/flink/pull/6732
 
 
   ## What is the purpose of the change
   
   This pull request provides support to optionally handle cassandra sink 
errors. A user may choose to ignore an exception instead of allowing the sink 
to fail. The handler is similar to `ActionRequestFailureHandler` in Elastic 
Sink.
   
   
   ## Brief change log
   
 - *Added interface `CassandraFailureHandler` and a no-op implementation 
`NoOpCassandraFailureHandler`*
 - *`CassandraSinkBase` has a new field `failureHandler` and updated 
constructor*
 - *`checkAsyncErrors` method in `CassandraSinkBase` calls the 
`failureHandler` instead of throwing an `IOException`*
 - *`CassandraSinkBuilder` has a new optional field `failureHandler` and 
updated setter. Uses no-op implementation as default*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added test in class `CassandraSinkBaseTest` that validates that 
failureHandler is called and the error is ignored*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **yes**
 - If yes, how is the feature documented? **docs**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10157) Allow `null` user values in map state with TTL

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623467#comment-16623467
 ] 

ASF GitHub Bot commented on FLINK-10157:


StefanRRichter commented on issue #6707: [FLINK-10157] [State TTL] Allow `null` 
user values in map state with TTL
URL: https://github.com/apache/flink/pull/6707#issuecomment-423507977
 
 
   Thanks @azagrebin ! Changes look good to me, merging.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow `null` user values in map state with TTL
> --
>
> Key: FLINK-10157
> URL: https://issues.apache.org/jira/browse/FLINK-10157
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
> Environment: Flink:1.6.0
> Scala:2.11
> JDK:1.8
>Reporter: chengjie.wu
>Assignee: Andrey Zagrebin
>Priority: Minor
>  Labels: pull-request-available
> Attachments: StateWithOutTtlTest.scala, StateWithTtlTest.scala
>
>
> Thanks for the StateTtl feature,this is exactly what I need now! But I found 
> an issue.
> In the previous version or when StateTtl is not enabled,MapState allows 
> `null` value,that means after
> {code:java}
> mapState.put("key", null){code}
> , then
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff}*true*{color}, but when StateTtl is enabled,
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff}*false*{color}(*the key has not expired*).
>  So I think the field `userValue` in 
> `org.apache.flink.runtime.state.ttl.TtlValue` should allow `null` value. User 
> state is null may not means the TtlValue should be null.
>  
> {code:java}
> /**
>  * This class wraps user value of state with TTL.
>  *
>  * @param  Type of the user value of state with TTL
>  */
> class TtlValue implements Serializable {
>  private final T userValue;
>  private final long lastAccessTimestamp;
> TtlValue(T userValue, long lastAccessTimestamp) {
>  Preconditions.checkNotNull(userValue);
>  this.userValue = userValue;
>  this.lastAccessTimestamp = lastAccessTimestamp;
>  }
> T getUserValue() {
>  return userValue;
>  }
> long getLastAccessTimestamp() {
>  return lastAccessTimestamp;
>  }
> }
> {code}
> Am I understanding right?
> This is my test class.
> [^StateWithTtlTest.scala] [^StateWithOutTtlTest.scala]
> ^Thanks!:)^



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623521#comment-16623521
 ] 

ASF GitHub Bot commented on FLINK-10384:


yanghua commented on issue #6730: [FLINK-10384] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-423516113
 
 
   cc @xccui 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-21 Thread GitBox
pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-423499421
 
 
   @NicoK said to me his LGTM, so I'm merging this :) Thank you @zhijiangW for 
the contribution and the time spent on the feature/PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski closed pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-09-21 Thread GitBox
pnowojski closed pull request #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 25d292771d0..6eebbbe88eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
/**
-* Sets a (next) target buffer to use and continues writing remaining 
data
-* to it until it is full.
+* Copies the intermediate data serialization buffer to the given 
target buffer.
 *
 * @param bufferBuilder the new target buffer to use
 * @return how much information was written to the target buffer and
 * whether this buffer is full
 */
-   SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder 
bufferBuilder) throws IOException;
+   SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
+
+   /**
+* Checks to decrease the size of intermediate data serialization 
buffer after finishing the
+* whole serialization process including {@link 
#serializeRecord(IOReadableWritable)} and
+* {@link #copyToBufferBuilder(BufferBuilder)}.
+*/
+   void prune();
 
/**
-* Clear and release internal state.
+* Supports copying an intermediate data serialization buffer to 
multiple target buffers
+* by resetting its initial position before each copying.
 */
-   void clear();
+   void reset();
 
/**
 * @return true if has some serialized data pending copying to 
the result {@link BufferBuilder}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index c4ab53f4b3a..ba2ed0133fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -20,11 +20,8 @@
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -32,7 +29,7 @@
 /**
  * Record serializer which serializes the complete record to an intermediate
  * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link 
#continueWritingWithNextBufferBuilder(BufferBuilder)}.
+ * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
  *
  * @param  The type of the records that are serialized.
  */
@@ -50,10 +47,6 @@
/** Intermediate buffer for length serialization. */
private final ByteBuffer lengthBuffer;
 
-   /** Current target {@link Buffer} of the serializer. */
-   @Nullable
-   private BufferBuilder targetBuffer;
-
public SpanningRecordSerializer() {
serializationBuffer = new DataOutputSerializer(128);
 
@@ -66,15 +59,12 @@ public SpanningRecordSerializer() {
}
 
/**
-* Serializes the complete record to an intermediate data serialization
-* buffer and starts copying it to the target buffer (if available).
+* Serializes the complete record to an intermediate data serialization 
buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this 

[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623432#comment-16623432
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski closed pull request #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 25d292771d0..6eebbbe88eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
/**
-* Sets a (next) target buffer to use and continues writing remaining 
data
-* to it until it is full.
+* Copies the intermediate data serialization buffer to the given 
target buffer.
 *
 * @param bufferBuilder the new target buffer to use
 * @return how much information was written to the target buffer and
 * whether this buffer is full
 */
-   SerializationResult continueWritingWithNextBufferBuilder(BufferBuilder 
bufferBuilder) throws IOException;
+   SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
+
+   /**
+* Checks to decrease the size of intermediate data serialization 
buffer after finishing the
+* whole serialization process including {@link 
#serializeRecord(IOReadableWritable)} and
+* {@link #copyToBufferBuilder(BufferBuilder)}.
+*/
+   void prune();
 
/**
-* Clear and release internal state.
+* Supports copying an intermediate data serialization buffer to 
multiple target buffers
+* by resetting its initial position before each copying.
 */
-   void clear();
+   void reset();
 
/**
 * @return true if has some serialized data pending copying to 
the result {@link BufferBuilder}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index c4ab53f4b3a..ba2ed0133fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -20,11 +20,8 @@
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -32,7 +29,7 @@
 /**
  * Record serializer which serializes the complete record to an intermediate
  * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link 
#continueWritingWithNextBufferBuilder(BufferBuilder)}.
+ * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
  *
  * @param  The type of the records that are serialized.
  */
@@ -50,10 +47,6 @@
/** Intermediate buffer for length serialization. */
private final ByteBuffer lengthBuffer;
 
-   /** Current target {@link Buffer} of the serializer. */
-   @Nullable
-   private BufferBuilder targetBuffer;
-
public SpanningRecordSerializer() {
serializationBuffer = new DataOutputSerializer(128);
 
@@ -66,15 +59,12 @@ public SpanningRecordSerializer() {
}
 
/**
-* Serializes the complete record to an intermediate data serialization
-* buffer and starts copying it to the target buffer (if available).
+* Serializes 

[jira] [Resolved] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski resolved FLINK-9913.
---
Resolution: Fixed

merged commit 914dffb into apache:master

> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9738) Provide a way to define Table Version Functions in Table API

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-9738.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Merged as f2a67a1682249d83711030f4e55b824cb18336d7

> Provide a way to define Table Version Functions in Table API
> 
>
> Key: FLINK-9738
> URL: https://issues.apache.org/jira/browse/FLINK-9738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9715) Support versioned joins with event time

2018-09-21 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-9715:
-

Assignee: Piotr Nowojski

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10377) Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623461#comment-16623461
 ] 

ASF GitHub Bot commented on FLINK-10377:


pnowojski commented on a change in pull request #6723: [FLINK-10377] Remove 
precondition in TwoPhaseCommitSinkFunction.notif…
URL: https://github.com/apache/flink/pull/6723#discussion_r219471764
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ##
 @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
//
 
 Review comment:
   Maybe you can ask the reporter of the bug to provide debug or at least info 
logs?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove precondition in TwoPhaseCommitSinkFunction.notifyCheckpointComplete
> --
>
> Key: FLINK-10377
> URL: https://issues.apache.org/jira/browse/FLINK-10377
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The precondition {{checkState(pendingTransactionIterator.hasNext(), 
> "checkpoint completed, but no transaction pending");}} in 
> {{TwoPhaseCommitSinkFunction.notifyCheckpointComplete()}} seems too strict, 
> because checkpoints can overtake checkpoints and will fail the precondition. 
> In this case the commit was already performed by the first notification and 
> subsumes the late checkpoint. I think the check can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6723: [FLINK-10377] Remove precondition in TwoPhaseCommitSinkFunction.notif…

2018-09-21 Thread GitBox
pnowojski commented on a change in pull request #6723: [FLINK-10377] Remove 
precondition in TwoPhaseCommitSinkFunction.notif…
URL: https://github.com/apache/flink/pull/6723#discussion_r219471764
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ##
 @@ -255,7 +255,6 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
//
 
 Review comment:
   Maybe you can ask the reporter of the bug to provide debug or at least info 
logs?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on issue #6707: [FLINK-10157] [State TTL] Allow `null` user values in map state with TTL

2018-09-21 Thread GitBox
StefanRRichter commented on issue #6707: [FLINK-10157] [State TTL] Allow `null` 
user values in map state with TTL
URL: https://github.com/apache/flink/pull/6707#issuecomment-423507977
 
 
   Thanks @azagrebin ! Changes look good to me, merging.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10320) Introduce JobMaster schedule micro-benchmark

2018-09-21 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623481#comment-16623481
 ] 

Piotr Nowojski commented on FLINK-10320:


[~till.rohrmann] might have a good point. [~Tison] could you provide a profiler 
logs for both JobManager and the TaskManager during this 10,000 parallelism 
scheduling issue? Maybe we could even narrow down the problematic component and 
write benchmarks target specifically for it instead of for whole JobManager?

> Introduce JobMaster schedule micro-benchmark
> 
>
> Key: FLINK-10320
> URL: https://issues.apache.org/jira/browse/FLINK-10320
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: tison
>Assignee: tison
>Priority: Major
>
> Based on {{org.apache.flink.streaming.runtime.io.benchmark}} stuff and the 
> repo [flink-benchmark|https://github.com/dataArtisans/flink-benchmarks], I 
> proposal to introduce another micro-benchmark which focuses on {{JobMaster}} 
> schedule performance
> h3. Target
> Benchmark how long from {{JobMaster}} startup(receive the {{JobGraph}} and 
> init) to all tasks RUNNING. Technically we use bounded stream and TM finishes 
> tasks as soon as they arrived. So the real interval we measure is to all 
> tasks FINISHED.
> h3. Case
> 1. JobGraph that cover EAGER + PIPELINED edges
> 2. JobGraph that cover LAZY_FROM_SOURCES + PIPELINED edges
> 3. JobGraph that cover LAZY_FROM_SOURCES + BLOCKING edges
> ps: maybe benchmark if the source is get from {{InputSplit}}?
> h3. Implement
> Based on the flink-benchmark repo, we finally run benchmark using jmh. So the 
> whole test suit is separated into two repos. The testing environment could be 
> located in the main repo, maybe under 
> flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/benchmark.
> To measure the performance of {{JobMaster}} scheduling, we need to simulate 
> an environment that:
> 1. has a real {{JobMaster}}
> 2. has a mock/testing {{ResourceManager}} that having infinite resource and 
> react immediately.
> 3. has a(many?) mock/testing {{TaskExecutor}} that deploy and finish tasks 
> immediately.
> [~trohrm...@apache.org] [~GJL] [~pnowojski] could you please review this 
> proposal to help clarify the goal and concrete details? Thanks in advance.
> Any suggestions are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623429#comment-16623429
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on issue #6417: [FLINK-9913][runtime] Improve output 
serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#issuecomment-423499421
 
 
   @NicoK said to me his LGTM, so I'm merging this :) Thank you @zhijiangW for 
the contribution and the time spent on the feature/PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.
> An additional benefit by using a single serializer for all channels is that 
> we get a potentially significant reduction on heap space overhead from fewer 
> intermediate serialization buffers (only once we got over 5MiB, these buffers 
> were pruned back to 128B!).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >