[GitHub] dianfu commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT aggregates to use the same distinct accumulator if possible

2019-01-09 Thread GitBox
dianfu commented on issue #7286: [FLINK-8739] [table] Optimize DISTINCT 
aggregates to use the same distinct accumulator if possible
URL: https://github.com/apache/flink/pull/7286#issuecomment-453002680
 
 
   @sunjincheng121  Thanks a lot for reviewing this PR. I have rebased the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on issue #7437: [FLINK-11278] [docs] Add documentation for TableAPI&SQL in scala-shell

2019-01-09 Thread GitBox
sunjincheng121 commented on issue #7437: [FLINK-11278] [docs] Add documentation 
for TableAPI&SQL in scala-shell
URL: https://github.com/apache/flink/pull/7437#issuecomment-453000779
 
 
   Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10433) Stepwise creation of the ExecutionGraph sub-structures

2019-01-09 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739159#comment-16739159
 ] 

shuai.xu commented on FLINK-10433:
--

Like this proposal. Autoscaling is very useful. Further more, we could support 
automatically deciding the result partition type, such as in FLINK-11299. And 
in the more future, we could support dynamic job graph.

> Stepwise creation of the ExecutionGraph sub-structures
> --
>
> Key: FLINK-10433
> URL: https://issues.apache.org/jira/browse/FLINK-10433
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> In this step, we break the construction of ExecutionGraph substructures into 
> multiple steps. When translating a JobGraph to an ExecutionGraph, we could 
> only create a “skeleton” ExecutionGraph that is only built up to the 
> ExecutionJobVertex level. This enables scheduling in two steps. First we can 
> compute the minimal and desired amount of resources and ask the SlotPool for 
> them. Only when the available resources match the minimal requirements, the 
> actual scheduling would start. With this information, we can build the 
> ExecutionVertex and Execution objects already in the right parallelism and 
> avoid running out of resources during scheduling. This separation will help 
> us with the autoscaling use-case, where we figure out the actual parallelism 
> after we started with scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zsq12138 opened a new pull request #7448: [hotfix][runtime] fix synchronous snapshots completed log

2019-01-09 Thread GitBox
zsq12138 opened a new pull request #7448: [hotfix][runtime] fix synchronous 
snapshots completed log
URL: https://github.com/apache/flink/pull/7448
 
 
   ## What is the purpose of the change
   The log output problems of methods snapshot in optimization 
DefaultOperatorStateBackend.java are shown in the current version, if 
asynchronous Snapshots are chosen, synchronous Snapshots' logs are also printed.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11298) Scheduling job in the unit of concurrent groups

2019-01-09 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-11298:


Assignee: shuai.xu

> Scheduling job in the unit of concurrent groups
> ---
>
> Key: FLINK-11298
> URL: https://issues.apache.org/jira/browse/FLINK-11298
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Now flink only support two scheduling modes, that's scheduling all tasks 
> Eager for streaming jobs and scheduling all task Lazy_from_source for batch 
> jobs. This is not flexible enough for the various requirements of different 
> job such as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based 
> scheduling strategy which first split a job into serval concurrent groups and 
> then schedule it in the unit of concurrent groups. This strategy will support 
> not only the existing EAGER and LAZY_FROM_SOURCE mode but also other 
> situation such as the Build/Probe in FLINK-10240.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11299) Decide the result partition type dynamically

2019-01-09 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-11299:


Assignee: shuai.xu

> Decide the result partition type dynamically 
> -
>
> Key: FLINK-11299
> URL: https://issues.apache.org/jira/browse/FLINK-11299
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Now, the result partition type is decided when compiling JobGraph at the 
> client. This's to say, whether to output the result of a task to its 
> consumers by PIPELINED or BLOCKING is decided at client and can not be 
> changed any more. However, it is usually seen in batch jobs that a task is 
> configured PIPELINED but its consumers can not be started due to lack of 
> resource, this will lead to failover now. So we proposal to support deciding 
> the result partition type dynamically during runtime according to the 
> resource of cluster and so on to increase the success rate of batch job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11299) Decide the result partition type dynamically

2019-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11299:


 Summary: Decide the result partition type dynamically 
 Key: FLINK-11299
 URL: https://issues.apache.org/jira/browse/FLINK-11299
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu


Now, the result partition type is decided when compiling JobGraph at the 
client. This's to say, whether to output the result of a task to its consumers 
by PIPELINED or BLOCKING is decided at client and can not be changed any more. 
However, it is usually seen in batch jobs that a task is configured PIPELINED 
but its consumers can not be started due to lack of resource, this will lead to 
failover now. So we proposal to support deciding the result partition type 
dynamically during runtime according to the resource of cluster and so on to 
increase the success rate of batch job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246652721
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/MaximumFailedContainersException.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+
+/**
+ * Exception for {@link ResourceManager} when it identified that the maximum 
number of failed containers is hit.
+ */
+public class MaximumFailedContainersException extends ResourceManagerException 
{
 
 Review comment:
   Good suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246652065
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -825,6 +837,12 @@ protected void closeTaskManagerConnection(final 
ResourceID resourceID, final Exc

slotManager.unregisterTaskManager(workerRegistration.getInstanceID());
 

workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+   failedContainerSoFar.getAndAdd(1);
+   if (failedContainerSoFar.intValue() >= 
maxFailedContainers) {
+   rejectAllPendingSlotRequests(new 
MaximumFailedContainersException(
 
 Review comment:
   No need. If other TMs can be still alive for restart strategy to make 
decision about whether to fail whole job. For per job cluster, if the job 
failed, the cluster will terminate by itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10432) Introduce bulk/group-aware scheduling

2019-01-09 Thread shuai.xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739120#comment-16739120
 ] 

shuai.xu commented on FLINK-10432:
--

Hi [~srichter], it's really a good idea. I think we could benefit more from it. 
With this work we could support Concurrent Group based scheduling strategy 
easily to satisfy the flexible requirements for different jobs, such as the 
FLINK-10240. The idea of Concurrent Group based scheduling is in FLINK-11298.

> Introduce bulk/group-aware scheduling
> -
>
> Key: FLINK-10432
> URL: https://issues.apache.org/jira/browse/FLINK-10432
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>
> In this step, we change our new Scheduler to support reasoning over a bulk of 
> Executions before interacting with the SlotPool instead of requesting slots 
> on a per-execution basis. This gives us the opportunity to group and schedule 
> tasks together as one unit and to apply scheduling algorithms that require a 
> more holistic view on the tasks at hand. For now, this step will probably 
> mainly address streaming / EAGER scheduling. After this step, we expect that 
> we can support the use-case of local recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11298) Scheduling job in the unit of concurrent groups

2019-01-09 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11298:


 Summary: Scheduling job in the unit of concurrent groups
 Key: FLINK-11298
 URL: https://issues.apache.org/jira/browse/FLINK-11298
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu


Now flink only support two scheduling modes, that's scheduling all tasks Eager 
for streaming jobs and scheduling all task Lazy_from_source for batch jobs. 
This is not flexible enough for the various requirements of different job such 
as FLINK-10240. We proposal a new ConcurrentSchedulingGroup based scheduling 
strategy which first split a job into serval concurrent groups and then 
schedule it in the unit of concurrent groups. This strategy will support not 
only the existing EAGER and LAZY_FROM_SOURCE mode but also other situation such 
as the Build/Probe in FLINK-10240.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11297) Add a doc link of jobmanager ha details

2019-01-09 Thread Yiqun Lin (JIRA)
Yiqun Lin created FLINK-11297:
-

 Summary: Add a doc link of jobmanager ha details
 Key: FLINK-11297
 URL: https://issues.apache.org/jira/browse/FLINK-11297
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.7.1
Reporter: Yiqun Lin


Currently the doc of jobmanager ha only tells us how to configure the ha 
settings. For some more details, it didn't mentioned. We can add a link for a 
more-detailed address: 
[https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246646421
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -145,6 +147,12 @@
/** All registered listeners for status updates of the ResourceManager. 
*/
private ConcurrentMap 
infoMessageListeners;
 
+   /** The number of failed containers since the master became active. */
+   protected AtomicInteger failedContainerSoFar = new AtomicInteger(0);
+
+   /** Number of failed TaskManager containers before stopping the 
application. Default is  Integer.MAX_VALUE */
+   protected int maxFailedContainers = Integer.MAX_VALUE;
 
 Review comment:
   Agree.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11297) Add a doc link of jobmanager ha details

2019-01-09 Thread Yiqun Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739105#comment-16739105
 ] 

Yiqun Lin commented on FLINK-11297:
---

Anyone who can help add me to the contributor role? I will take a try for this. 
Thanks.

> Add a doc link of jobmanager ha details
> ---
>
> Key: FLINK-11297
> URL: https://issues.apache.org/jira/browse/FLINK-11297
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.7.1
>Reporter: Yiqun Lin
>Priority: Minor
>
> Currently the doc of jobmanager ha only tells us how to configure the ha 
> settings. For some more details, it didn't mentioned. We can add a link for a 
> more-detailed address: 
> [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] 123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-09 Thread GitBox
123avi commented on a change in pull request #7418: FLINK-11053 Documentation - 
update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r246645781
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,11 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+val input: DataStream[(IntWritable, Text)] = ???
 
-val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
+sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
+sink.setWriter(new StringWriter[(IntWritable, Text)]())
 
 Review comment:
   @hequn8128 sounds good, Thank you very much. I pushed the correction I did 
emphasize the usage of `java.tuple.Tuple2` to avoid the frustration of someone 
that will do copy&paste to the example :) . 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] 123avi edited a comment on issue #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-09 Thread GitBox
123avi edited a comment on issue #7418: FLINK-11053 Documentation - update 
scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#issuecomment-452835988
 
 
   I understand, nice, but that will not be a pure Scala example I think it’s
   more like a workaround . IMO if we do as you suggest we should explicitly
   meantion the usage of Java Tuple2 .
   What do you think?
   
   On Wed, 9 Jan 2019 at 3:55 Hequn Cheng  wrote:
   
   > *@hequn8128* commented on this pull request.
   > --
   >
   > In docs/dev/connectors/filesystem_sink.md
   > :
   >
   > >
   > -val sink = new BucketingSink[String]("/base/path")
   > -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
   > -sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
   > +val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
   > +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
   > +sink.setWriter(new StringWriter[(IntWritable, Text)]())
   >
   > @123avi  What I mean is don't use scala tuple.
   > Use java tuple even for the scala example.
   > val input: DataStream[Tuple2[A, B]] is different from val input:
   > DataStream[(A, B)]. org.apache.flink.api.java.tuple.Tuple2 is a class in
   > Flink.
   >
   > I wrote a sample code for you. Take a look at the code here
   > 
.
   > You can also try to run the test. It works well.
   >
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11256) Referencing StreamNode objects directly in StreamEdge causes the sizes of JobGraph and TDD to become unnecessarily large

2019-01-09 Thread Haibo Suen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739066#comment-16739066
 ] 

Haibo Suen commented on FLINK-11256:


Okay, we'll just do a small fix.

> Referencing StreamNode objects directly in StreamEdge causes the sizes of 
> JobGraph and TDD to become unnecessarily large
> 
>
> Key: FLINK-11256
> URL: https://issues.apache.org/jira/browse/FLINK-11256
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Haibo Suen
>Assignee: Haibo Suen
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When a job graph is generated from StreamGraph, StreamEdge(s) on the stream 
> graph are serialized to StreamConfig and stored into the job graph. After 
> that, the serialized bytes will be included in the TDD and distributed to TM. 
> Because StreamEdge directly reference to StreamNode objects including 
> sourceVertex and targetVertex, these objects are also written transitively on 
> serializing StreamEdge. But these StreamNode objects are not needed in JM and 
> Task. For a large size topology, this will causes JobGraph/TDD to become much 
> larger than that actually need, and more likely to occur rpc timeout when 
> transmitted.
> In StreamEdge, only the ID of StreamNode should be stored to avoid this 
> situation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bowenli86 commented on issue #7409: [FLINK-11064] [table] Setup a new flink-table module structure

2019-01-09 Thread GitBox
bowenli86 commented on issue #7409: [FLINK-11064] [table] Setup a new 
flink-table module structure
URL: https://github.com/apache/flink/pull/7409#issuecomment-452976188
 
 
   @zentol agree, would be nice to split into a couple or multiple PRs if it's 
not too late


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on issue #6472: [FLINK-9294] [table] Improve type inference for UDFs with composite parameter and/or result type

2019-01-09 Thread GitBox
walterddr commented on issue #6472: [FLINK-9294] [table] Improve type inference 
for UDFs with composite parameter and/or result type
URL: https://github.com/apache/flink/pull/6472#issuecomment-452972995
 
 
   @twalthr could you kindly take a look at the overall changes for this 
support? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
walterddr commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246635359
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -512,4 +517,53 @@ public void testOnContainerCompleted() throws Exception {
});
}};
}
+
+   /**
+*  Tests that YarnResourceManager will trigger to reject all 
pending slot request, when maximum number of failed
+*  contains is hit.
+*/
+   @Test
+   public void testOnContainersAllocatedWithFailure() throws Exception {
+   new Context() {{
+   runTest(() -> {
+   CompletableFuture registerSlotRequestFuture 
= resourceManager.runInMainThread(() -> {
+   
rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+
+   // Callback from YARN when container is 
allocated.
+   Container disconnectedContainer1 = 
mockContainer("container1", 1234, 1);
+   
resourceManager.onContainersAllocated(ImmutableList.of(disconnectedContainer1));
+   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+   
verify(mockNMClient).startContainer(eq(disconnectedContainer1), 
any(ContainerLaunchContext.class));
+
+   ResourceID connectedTM = new 
ResourceID(disconnectedContainer1.getId().toString());
+
+   
resourceManager.registerTaskExecutor("container1", connectedTM, 1234,
+   hardwareDescription, Time.seconds(10L));
+
+   // force to unregister the task manager
+   
resourceManager.disconnectTaskManager(connectedTM, new TimeoutException());
+
+   // request second slot
+   registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   
rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
 
 Review comment:
   can probably mock this so that no need to spy on the `slotManager` ? verify 
`request.completeExceptionally()` has been call should be suffice.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
walterddr commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246633969
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/MaximumFailedContainersException.java
 ##
 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.exceptions;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+
+/**
+ * Exception for {@link ResourceManager} when it identified that the maximum 
number of failed containers is hit.
+ */
+public class MaximumFailedContainersException extends ResourceManagerException 
{
 
 Review comment:
   probably rename to `MaximumFailedTaskManagerExceedingException` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
walterddr commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246633525
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -145,6 +147,12 @@
/** All registered listeners for status updates of the ResourceManager. 
*/
private ConcurrentMap 
infoMessageListeners;
 
+   /** The number of failed containers since the master became active. */
+   protected AtomicInteger failedContainerSoFar = new AtomicInteger(0);
+
+   /** Number of failed TaskManager containers before stopping the 
application. Default is  Integer.MAX_VALUE */
+   protected int maxFailedContainers = Integer.MAX_VALUE;
 
 Review comment:
   this should be `maximumAllowedTaskManagerFailureCount`, to avoid confusion 
since this is only used during `closeTaskManagerConnection` but not 
`closeJobManagerConnection`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager

2019-01-09 Thread GitBox
walterddr commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum failed TMs in YarnResourceManager
URL: https://github.com/apache/flink/pull/7356#discussion_r246634064
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -825,6 +837,12 @@ protected void closeTaskManagerConnection(final 
ResourceID resourceID, final Exc

slotManager.unregisterTaskManager(workerRegistration.getInstanceID());
 

workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+   failedContainerSoFar.getAndAdd(1);
+   if (failedContainerSoFar.intValue() >= 
maxFailedContainers) {
+   rejectAllPendingSlotRequests(new 
MaximumFailedContainersException(
 
 Review comment:
   do we need to do 
   `slotManager.unregisterTaskManager` here to clear all the registered 
TaskManagers as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7420: [FLINK-11272][flink-yarn] Support for parsing multiple --yarnship par…

2019-01-09 Thread GitBox
walterddr commented on a change in pull request #7420: 
[FLINK-11272][flink-yarn] Support for parsing multiple --yarnship par…
URL: https://github.com/apache/flink/pull/7420#discussion_r246631239
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
 ##
 @@ -311,14 +311,16 @@ private AbstractYarnClusterDescriptor createDescriptor(
}
 
List shipFiles = new ArrayList<>();
-   // path to directory to ship
+   // path to directories to ship
if (cmd.hasOption(shipPath.getOpt())) {
-   String shipPath = 
cmd.getOptionValue(this.shipPath.getOpt());
-   File shipDir = new File(shipPath);
-   if (shipDir.isDirectory()) {
-   shipFiles.add(shipDir);
-   } else {
-   LOG.warn("Ship directory is not a directory. 
Ignoring it.");
+   String[] shipPaths = 
cmd.getOptionValues(this.shipPath.getOpt());
+   for (String shipPath : shipPaths) {
 
 Review comment:
   Can probably be simplified using:
   ```
   Arrays.stream(shipPaths)
.map((shipPath) -> { if (shipDir.isDirectory()) 
. else  } // The original code
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] eaglewatcherwb commented on issue #7436: [FLINK-11071][core] add support for dynamic proxy classes resolution …

2019-01-09 Thread GitBox
eaglewatcherwb commented on issue #7436: [FLINK-11071][core] add support for 
dynamic proxy classes resolution …
URL: https://github.com/apache/flink/pull/7436#issuecomment-452959381
 
 
   @OlegZhukov  for the `resolveProxyClass(interfaces)` calling sequence, I 
think `super.resolveProxyClass(interfaces)` should be called last, since there 
is a user defined class loader, which may be user really want to use rather 
than the default one.  And in `resolveClass`,  `super.resolveClass` is also 
called last, should we be consistent with this? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-09 Thread GitBox
zhijiangW commented on a change in pull request #7438: [FLINK-11282][network] 
Merge StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#discussion_r246626579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -72,17 +73,29 @@
 
private Counter numBuffersOut = new SimpleCounter();
 
+   /** Default name for teh output flush thread, if no name with a task 
reference is given. */
+   private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = 
"OutputFlusher";
+
+   /** The thread that periodically flushes the output, to give an upper 
latency bound. */
+   private final OutputFlusher outputFlusher;
+
+   /** The exception encountered in the flushing thread. */
+   private Throwable flusherException;
 
 Review comment:
   `AtomicReference` can be a final variable which seems more better 
if not affect performance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-09 Thread GitBox
zhijiangW commented on a change in pull request #7438: [FLINK-11282][network] 
Merge StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#discussion_r246626442
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -72,17 +73,29 @@
 
private Counter numBuffersOut = new SimpleCounter();
 
+   /** Default name for teh output flush thread, if no name with a task 
reference is given. */
+   private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = 
"OutputFlusher";
+
+   /** The thread that periodically flushes the output, to give an upper 
latency bound. */
+   private final OutputFlusher outputFlusher;
+
+   /** The exception encountered in the flushing thread. */
+   private Throwable flusherException;
 
 Review comment:
   You pointed out a previous potential risk. I am not sure why the history 
code did not add `sync/volatile` here, unless the real-time visibility of this 
variable is not very important for task thread, as long as it can be seen 
finally.
   
   But such behavior still seems undetermined. I suggest adding volatile 
keyword or changing this variable as `AtomicReference`. Which one do 
you prefer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] eaglewatcherwb commented on a change in pull request #7436: [FLINK-11071][core] add support for dynamic proxy classes resolution …

2019-01-09 Thread GitBox
eaglewatcherwb commented on a change in pull request #7436: [FLINK-11071][core] 
add support for dynamic proxy classes resolution …
URL: https://github.com/apache/flink/pull/7436#discussion_r246624417
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
 ##
 @@ -91,6 +92,40 @@ public ClassLoaderObjectInputStream(InputStream in, 
ClassLoader classLoader) thr
return super.resolveClass(desc);
}
 
+   @Override
+   protected Class resolveProxyClass(String[] interfaces) 
throws IOException, ClassNotFoundException {
 
 Review comment:
   I read the code, it seems that there is no test case for this. I will try to 
add a test case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11296) Support truncate in TableAPI

2019-01-09 Thread xuqianjin (JIRA)
xuqianjin created FLINK-11296:
-

 Summary: Support truncate in TableAPI
 Key: FLINK-11296
 URL: https://issues.apache.org/jira/browse/FLINK-11296
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.7.1
Reporter: xuqianjin
Assignee: xuqianjin


Add {{truncate}} support in TableAPI, Add support as follows:
||expression||Expect the result||
|truncate(cast(42.345 as decimal(2, 3)), 2)|42.34|
|truncate(42, -1)|40|
|truncate(42.324)|42|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x

2019-01-09 Thread BoWang (JIRA)
BoWang created FLINK-11295:
--

 Summary: Rename configuration options of queryable state from 
query.x to queryable-state.x
 Key: FLINK-11295
 URL: https://issues.apache.org/jira/browse/FLINK-11295
 Project: Flink
  Issue Type: Improvement
Reporter: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x

2019-01-09 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang updated FLINK-11295:
---
Component/s: Configuration

> Rename configuration options of queryable state from query.x to 
> queryable-state.x
> -
>
> Key: FLINK-11295
> URL: https://issues.apache.org/jira/browse/FLINK-11295
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: BoWang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11295) Rename configuration options of queryable state from query.x to queryable-state.x

2019-01-09 Thread BoWang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BoWang reassigned FLINK-11295:
--

Assignee: BoWang

> Rename configuration options of queryable state from query.x to 
> queryable-state.x
> -
>
> Key: FLINK-11295
> URL: https://issues.apache.org/jira/browse/FLINK-11295
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: BoWang
>Assignee: BoWang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] eaglewatcherwb commented on issue #7412: [FLINK-10866][Runtime] 1. Explicitly enable qs server and proxy. 2. QS

2019-01-09 Thread GitBox
eaglewatcherwb commented on issue #7412: [FLINK-10866][Runtime] 1. Explicitly 
enable qs server and proxy. 2. QS
URL: https://github.com/apache/flink/pull/7412#issuecomment-452951913
 
 
   > This looks good now! I'll merge once Travis is green.
   > 
   > @eaglewatcherwb Do you want to open a follow-up Jira issue to rename the 
other options from `query.x` to `queryable-state.x`. We can keep the old 
options around with the `deprecatedKey()` method for now.
   
   Ok, I am glad to do this. The [follow-up 
jira](https://issues.apache.org/jira/browse/FLINK-11295) is open. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] eaglewatcherwb commented on issue #7412: [FLINK-10866][Runtime] 1. Explicitly enable qs server and proxy. 2. QS

2019-01-09 Thread GitBox
eaglewatcherwb commented on issue #7412: [FLINK-10866][Runtime] 1. Explicitly 
enable qs server and proxy. 2. QS
URL: https://github.com/apache/flink/pull/7412#issuecomment-452950837
 
 
   > @eaglewatcherwb Are both `eaglewatcherwb` and `yanyu.wb` your nicknames? I 
want to squash the git commits into one commit but which username should be on 
the final commit?
   
   Yes, both are my nicknames. eaglewatcherwb is OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] eaglewatcherwb commented on a change in pull request #7436: [FLINK-11071][core] add support for dynamic proxy classes resolution …

2019-01-09 Thread GitBox
eaglewatcherwb commented on a change in pull request #7436: [FLINK-11071][core] 
add support for dynamic proxy classes resolution …
URL: https://github.com/apache/flink/pull/7436#discussion_r246621028
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
 ##
 @@ -65,33 +70,47 @@ public static void main(String[] args) throws Exception {
DataStream text = env.socketTextStream(hostname, port, 
"\n");
 
// parse the data, group it, window it, and aggregate the counts
-   DataStream windowCounts = text
-
-   .flatMap(new FlatMapFunction() {
+   DataStream windowCounts = 
text.flatMap(serializableProxy(
 
 Review comment:
   Good comment, original case should be retained. I will add another case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11294) Remove legacy JobInfo usage in valid tests

2019-01-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11294:
---
Labels: pull-request-available  (was: )

> Remove legacy JobInfo usage in valid tests
> --
>
> Key: FLINK-11294
> URL: https://issues.apache.org/jira/browse/FLINK-11294
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{StandaloneSubmittedJobGraphStoreTest.java}} and 
> {{ZooKeeperSubmittedJobGraphsStoreITCase.java}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun opened a new pull request #7447: [FLINK-11294] [test] Remove legacy JobInfo usage in valid tests

2019-01-09 Thread GitBox
TisonKun opened a new pull request #7447: [FLINK-11294] [test] Remove legacy 
JobInfo usage in valid tests
URL: https://github.com/apache/flink/pull/7447
 
 
   ## What is the purpose of the change
   
   Remove legacy JobInfo usage in valid tests, 
StandaloneSubmittedJobGraphStoreTest.java and 
ZooKeeperSubmittedJobGraphsStoreITCase.java
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup and it itself is a test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  (no)
 - The serializers:  (no)
 - The runtime per-record code paths (performance sensitive):  (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper:  (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable )
   
   cc @tillrohrmann 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11294) Remove legacy JobInfo usage in valid tests

2019-01-09 Thread TisonKun (JIRA)
TisonKun created FLINK-11294:


 Summary: Remove legacy JobInfo usage in valid tests
 Key: FLINK-11294
 URL: https://issues.apache.org/jira/browse/FLINK-11294
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.8.0
Reporter: TisonKun
Assignee: TisonKun
 Fix For: 1.8.0


{{StandaloneSubmittedJobGraphStoreTest.java}} and 
{{ZooKeeperSubmittedJobGraphsStoreITCase.java}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on issue #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-09 Thread GitBox
hequn8128 commented on issue #7418: FLINK-11053 Documentation - update scala 
sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#issuecomment-452943225
 
 
   @123avi Yes, I think we should keep it as `Tuple2`. It would be helpful for 
users, especially for showing how to use `SequenceFileWriter`.  Also, it keeps 
consistent with the example of Java.
   What do you think?
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9700) Document FlinkKafkaProducer behaviour for Kafka versions > 0.11

2019-01-09 Thread leesf (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738900#comment-16738900
 ] 

leesf commented on FLINK-9700:
--

[~pnowojski], hi piotr, it has been stated in the document [kafka 
connectors|https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html]
 . Is there anything else to state?

> Document FlinkKafkaProducer behaviour for Kafka versions > 0.11
> ---
>
> Key: FLINK-9700
> URL: https://issues.apache.org/jira/browse/FLINK-9700
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.5.0
>Reporter: Ufuk Celebi
>Assignee: leesf
>Priority: Minor
>
> FlinkKafkaProducer for Kafka 0.11 uses reflection to work around API 
> limitations of the Kafka client. Using reflection breaks with newer versions 
> of the Kafka client (due to internal changes of the client).
> The documentation does not mention newer Kafka versions. We should add the 
> following notes:
> - Only package Kafka connector with kafka.version property set to 0.11.*.*
> - Mention that it is possible to use the 0.11 connector with newer versions 
> of Kafka as the protocol seems to be backwards compatible (double check that 
> this is correct)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on issue #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
klion26 commented on issue #7351: [FLINK-11008][State Backends, 
Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#issuecomment-452938913
 
 
   @azagrebin Thank you for your quick response, and all the patience comments, 
I'vs addressed the comments. Thank you again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11071) Dynamic proxy classes cannot be resolved when deserializing job graph

2019-01-09 Thread Oleg Zhukov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738745#comment-16738745
 ] 

Oleg Zhukov commented on FLINK-11071:
-

BoWang, I don't think anyone looked into it, so you're welcome to fix it!

I briefly looked through your pull request, it looks good in general. I have 
minor concern about backwards compatibility, which probably needs to be 
verified with Flink core engineers. See my comments at 
https://github.com/apache/flink/pull/7436 .

> Dynamic proxy classes cannot be resolved when deserializing job graph
> -
>
> Key: FLINK-11071
> URL: https://issues.apache.org/jira/browse/FLINK-11071
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.6.2, 1.7.0, 1.8.0
>Reporter: Oleg Zhukov
>Assignee: BoWang
>Priority: Major
>  Labels: pull-request-available
> Attachments: SocketWindowWordCount.java
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> It turns impossible to use Java dynamic proxy objects in the job definition 
> (for example, as a MapFunction implementation).
> During deserialization of the job graph, the default implementation of  
> ObjectInputStream.resolveProxyClass(..) is used, which is not using the 
> custom class loader (to look into the submitted jar) and therefore throws 
> ClassNotFoundException.
> Looks like in order to address this, 
> InstantiationUtil.ClassLoaderObjectInputStream needs to provide custom 
> implementation of resolveProxyClass(..) method as well (in addition to 
> resolveClass(..)).
> In order to reproduce the issue, run the attached SocketWindowWordCount Flink 
> app. It's a slight variation of the canonical [SocketWindowWordCount 
>  
> example|https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html]
>  with a dynamic proxy implementation of the flat map transformation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] OlegZhukov commented on a change in pull request #7436: [FLINK-11071][core] add support for dynamic proxy classes resolution …

2019-01-09 Thread GitBox
OlegZhukov commented on a change in pull request #7436: [FLINK-11071][core] add 
support for dynamic proxy classes resolution …
URL: https://github.com/apache/flink/pull/7436#discussion_r246565932
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
 ##
 @@ -91,6 +92,40 @@ public ClassLoaderObjectInputStream(InputStream in, 
ClassLoader classLoader) thr
return super.resolveClass(desc);
}
 
+   @Override
+   protected Class resolveProxyClass(String[] interfaces) 
throws IOException, ClassNotFoundException {
 
 Review comment:
   I'm wondering if a test case can be written for this. Is there a test case 
for correctly resolving classes in a basic (non-proxy case)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] OlegZhukov commented on a change in pull request #7436: [FLINK-11071][core] add support for dynamic proxy classes resolution …

2019-01-09 Thread GitBox
OlegZhukov commented on a change in pull request #7436: [FLINK-11071][core] add 
support for dynamic proxy classes resolution …
URL: https://github.com/apache/flink/pull/7436#discussion_r246564637
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
 ##
 @@ -65,33 +70,47 @@ public static void main(String[] args) throws Exception {
DataStream text = env.socketTextStream(hostname, port, 
"\n");
 
// parse the data, group it, window it, and aggregate the counts
-   DataStream windowCounts = text
-
-   .flatMap(new FlatMapFunction() {
+   DataStream windowCounts = 
text.flatMap(serializableProxy(
 
 Review comment:
   I don't think the example should be changed. Using proxies in operators is a 
pretty advanced use case, while the example should focus on basic usage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] 123avi commented on issue #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-01-09 Thread GitBox
123avi commented on issue #7418: FLINK-11053 Documentation - update scala 
sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#issuecomment-452835988
 
 
   I understand, nice, but that will not be a pure Scala example I think it’s
   more like a workaround . IMO if we do as you suggest we should explicitly
   meantion the usage of Java Tuple2 .
   What do you think?
   
   On Wed, 9 Jan 2019 at 3:55 Hequn Cheng  wrote:
   
   > *@hequn8128* commented on this pull request.
   > --
   >
   > In docs/dev/connectors/filesystem_sink.md
   > :
   >
   > >
   > -val sink = new BucketingSink[String]("/base/path")
   > -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
   > -sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
   > +val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
   > +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
   > +sink.setWriter(new StringWriter[(IntWritable, Text)]())
   >
   > @123avi  What I mean is don't use scala tuple.
   > Use java tuple even for the scala example.
   > val input: DataStream[Tuple2[A, B]] is different from val input:
   > DataStream[(A, B)]. org.apache.flink.api.java.tuple.Tuple2 is a class in
   > Flink.
   >
   > I wrote a sample code for you. Take a look at the code here
   > 
.
   > You can also try to run the test. It works well.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > , or mute
   > the thread
   > 

   > .
   >
   -- 
   Sincerely,
   Avi Levi
   m: +972-52-3459959
   https://il.linkedin.com/in/leviavi
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] blublinsky commented on issue #7446: Added Model serving - implementation of the Flip -23.

2019-01-09 Thread GitBox
blublinsky commented on issue #7446: Added Model serving - implementation of 
the Flip -23.
URL: https://github.com/apache/flink/pull/7446#issuecomment-452805681
 
 
   What is the purpose of this change
   Implementation of Flip 23 - Flink model serving
   Brief change log
   Adds Flink model serving module
   Verifying this change
   
   Unit tests for the library.
   Sample application in Flink examples/model serving
   Documentation
   Does this pull request introduce a new feature? 
   Yes. Flip-23


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] blublinsky opened a new pull request #7446: Added Model serving - implementation of the Flip -23.

2019-01-09 Thread GitBox
blublinsky opened a new pull request #7446: Added Model serving - 
implementation of the Flip -23.
URL: https://github.com/apache/flink/pull/7446
 
 
   This adds Module flink-modelserving, that implements model serving library 
(both java and scala)
   and module flink-examples-modelserving in the flink-examples module, for the 
examples of library usage
   
   This implements Flip-23 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK opened a new pull request #7445: [FLINK-11154][network] bump Netty to 4.1.32

2019-01-09 Thread GitBox
NicoK opened a new pull request #7445: [FLINK-11154][network] bump Netty to 
4.1.32
URL: https://github.com/apache/flink/pull/7445
 
 
   ## What is the purpose of the change
   
   This pull request bumps Netty to a newer version (4.1.32) for the following 
improvements/changes done since 4.1.24:
   - big improvements (performance, feature set) for using openSSL based SSL 
engine (useful for FLINK-9816)
   - allow multiple shaded versions of the same netty artifact (as long as the 
shaded prefix is different)
   - Ensure ByteToMessageDecoder.Cumulator implementations always release
   - Don't re-arm timerfd each epoll_wait
   - Use a non-volatile read for ensureAccessible() whenever possible to reduce 
overhead and allow better inlining.
   - Do not fail on runtime when an older version of Log4J2 is on the classpath
   - Fix leak and corruption bugs in CompositeByteBuf
   - Add support for TLSv1.3
   - Harden ref-counting concurrency semantics
   - bug fixes
   - Java 9-12 related fixes
   
   Please note that this PR is based on 
https://github.com/apache/flink-shaded/pull/55 and temporarily fetches from 
that repository to run the tests with the upgraded netty version.
   Beware NOT to merge the according commit marked as `[DO-NOT-MERGE]` - and 
then only merge it once a new version of `flink-shaded` has been released that 
contains the new Netty version.
   
   ## Brief change log
   
   - bump netty dependency
   - update `AbstractByteBufTest` to the updated code
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **yes**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10134) UTF-16 support for TextInputFormat

2019-01-09 Thread xuqianjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuqianjin reassigned FLINK-10134:
-

Assignee: xuqianjin

> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Assignee: xuqianjin
>Priority: Critical
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> misreading the code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on 
> all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #7430: [FLINK-10848] Remove container requests after successful container allocation

2019-01-09 Thread GitBox
GJL commented on a change in pull request #7430: [FLINK-10848] Remove container 
requests after successful container allocation
URL: https://github.com/apache/flink/pull/7430#discussion_r246448327
 
 

 ##
 File path: flink-yarn/src/test/resources/log4j-test.properties
 ##
 @@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 Review comment:
   I think we always leave it off in the master. (this is not a review)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7430: [FLINK-10848] Remove container requests after successful container allocation

2019-01-09 Thread GitBox
GJL commented on a change in pull request #7430: [FLINK-10848] Remove container 
requests after successful container allocation
URL: https://github.com/apache/flink/pull/7430#discussion_r246448327
 
 

 ##
 File path: flink-yarn/src/test/resources/log4j-test.properties
 ##
 @@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 Review comment:
   I think we always leave it off in the master.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor

2019-01-09 Thread GitBox
yanghua commented on a change in pull request #6876: [FLINK-10251] Handle 
oversized response messages in AkkaRpcActor
URL: https://github.com/apache/flink/pull/6876#discussion_r246445593
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 ##
 @@ -144,6 +147,68 @@ public void testMessageDiscarding() throws Exception {
rpcEndpoint.shutDown();
}
 
+   @Test(expected = ExecutionException.class)
+   public void testOversizedResponseMsg() throws Exception {
+   Configuration configuration = new Configuration();
+   configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
+   AkkaRpcService rpcService = null;
+   OversizedResponseRpcEndpoint rpcEndpoint = null;
+   try {
+   rpcService = new TestingRemoteRpcService(configuration);
+
+   rpcEndpoint = new 
OversizedResponseRpcEndpoint(rpcService);
+
+   OversizedResponseMsgRpcGateway rpcGateway = 
rpcEndpoint.getSelfGateway(OversizedResponseMsgRpcGateway.class);
+
+   rpcEndpoint.start();
+
+   CompletableFuture result = 
rpcGateway.calculate();
+
+   result.get(timeout.getSize(), timeout.getUnit());
+
+   fail("Expected a ExecutionException about the 
oversize");
+   } finally {
 
 Review comment:
   so, now this mode can pass the test : 
   
   ```
   } catch (Exception e) {
assertTrue(e.getCause() instanceof IOException);
   } finally {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246444142
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +149,133 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStateOutputStream outputStream = 
createFailingCheckpointStateOutputStream(expectedException);
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
+   File checkpointPrivateFolder = 
temporaryFolder.newFolder("private");
+   Path checkpointPrivateDirectory = new 
Path(checkpointPrivateFolder.getPath());
+
+   File checkpointSharedFolder = 
temporaryFolder.newFolder("shared");
+   Path checkpointSharedDirectory = new 
Path(checkpointSharedFolder.getPath());
+
+   FileSystem fileSystem = 
checkpointPrivateDirectory.getFileSystem();
+   int fileStateSizeThreshold = 1024;
+   FsCheckpointStreamFactory checkpointStreamFactory =
+   new FsCheckpointStreamFactory(fileSystem, 
checkpointPrivateDirectory, checkpointSharedDirectory, fileStateSizeThreshold);
+
+   String localFolder = "local";
+   temporaryFolder.newFolder(localFolder);
+
+   int sstFileCount = 6;
+   Map sstFilePaths = 
generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold);
+
+   Map sstFiles = new 
HashMap<>(sstFileCount);
+
+   
sstFiles.putAll(RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
 
 Review comment:
   I think it can be directly assigned to 
   `Map sstFiles = 
uploadFilesToCheckpointFs(..)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246443750
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 ##
 @@ -410,74 +412,51 @@ private void uploadSstFiles(
// write state data
Preconditions.checkState(localBackupDirectory.exists());
 
+   Map sstFilePaths = new HashMap<>();
+   Map miscFilePaths = new 
HashMap<>();
+
FileStatus[] fileStatuses = 
localBackupDirectory.listStatus();
if (fileStatuses != null) {
-   for (FileStatus fileStatus : fileStatuses) {
-   final Path filePath = 
fileStatus.getPath();
-   final String fileName = 
filePath.getName();
-   final StateHandleID stateHandleID = new 
StateHandleID(fileName);
-
-   if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
-   final boolean existsAlready =
-   baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
-
-   if (existsAlready) {
-   // we introduce a 
placeholder state handle, that is replaced with the
-   // original from the 
shared state registry (created from a previous checkpoint)
-   sstFiles.put(
-   stateHandleID,
-   new 
PlaceholderStreamStateHandle());
-   } else {
-   
sstFiles.put(stateHandleID, uploadLocalFileToCheckpointFs(filePath));
-   }
-   } else {
-   StreamStateHandle fileHandle = 
uploadLocalFileToCheckpointFs(filePath);
-   miscFiles.put(stateHandleID, 
fileHandle);
-   }
-   }
+   createUploadFilePaths(fileStatuses, sstFiles, 
sstFilePaths, miscFilePaths);
+
+   sstFiles.putAll(uploadFilesToCheckpointFs(
+   sstFilePaths,
+   numberOfRestoringThreads,
+   checkpointStreamFactory,
+   getSnapshotCloseableRegistry()));
+   miscFiles.putAll(uploadFilesToCheckpointFs(
+   miscFilePaths,
+   numberOfRestoringThreads,
+   checkpointStreamFactory,
+   getSnapshotCloseableRegistry()));
}
}
 
-   private StreamStateHandle uploadLocalFileToCheckpointFs(Path 
filePath) throws Exception {
-   FSDataInputStream inputStream = null;
-   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
-
-   try {
-   final byte[] buffer = new 
byte[READ_BUFFER_SIZE];
-
-   FileSystem backupFileSystem = 
localBackupDirectory.getFileSystem();
-   inputStream = backupFileSystem.open(filePath);
-   registerCloseableForCancellation(inputStream);
-
-   outputStream = checkpointStreamFactory
-   
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
-   registerCloseableForCancellation(outputStream);
-
-   while (true) {
-   int numBytes = inputStream.read(buffer);
-
-   if (numBytes == -1) {
-   break;
+   private void createUploadFilePaths(
+   FileStatus[] fileStatuses,
+   Map sstFiles,
+   Map sstFilePaths,
+   Map miscFilePaths) {
+   for (FileStatus fileStatus : fileStatuses) {
+   final Path filePath = fileStatus.getPath();
+

[GitHub] azagrebin commented on a change in pull request #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r246443897
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -138,6 +149,133 @@ public void testMultiThreadRestoreCorrectly() throws 
Exception {
}
}
 
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testMultiThreadUploadThreadPoolExceptionRethrow() throws 
IOException {
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread upload states.");
+
+   CheckpointStateOutputStream outputStream = 
createFailingCheckpointStateOutputStream(expectedException);
+   CheckpointStreamFactory checkpointStreamFactory = 
(CheckpointedStateScope scope) -> outputStream;
+
+   File file = 
temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+   generateRandomFileContent(file.getPath(), 20);
+
+   Map filePaths = new HashMap<>(1);
+   filePaths.put(new StateHandleID("mockHandleID"), new 
Path(file.getPath()));
+   try {
+   RocksDbStateDataTransfer.uploadFilesToCheckpointFs(
+   filePaths,
+   5,
+   checkpointStreamFactory,
+   new CloseableRegistry());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, e);
+   }
+   }
+
+   /**
+* Test that upload files with multi-thread correctly.
+*/
+   @Test
+   public void testMultiThreadUploadCorrectly() throws Exception {
+
 
 Review comment:
   could you remove this new line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10724) Refactor failure handling in check point coordinator

2019-01-09 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738360#comment-16738360
 ] 

vinoyang commented on FLINK-10724:
--

[~azagrebin] when should I start this issue?

> Refactor failure handling in check point coordinator
> 
>
> Key: FLINK-10724
> URL: https://issues.apache.org/jira/browse/FLINK-10724
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
>
> At the moment failure handling of asynchronously triggered checkpoint in 
> check point coordinator happens in different places. We could organise it 
> similar way as failure handling of synchronous triggering of checkpoint in 
> *CheckpointTriggerResult* where we classify error cases. This will simplify 
> e.g. integration of error counter for FLINK-4810.
> See also discussion here: [https://github.com/apache/flink/pull/6567]
> The specific design document : 
> https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor

2019-01-09 Thread GitBox
yanghua commented on a change in pull request #6876: [FLINK-10251] Handle 
oversized response messages in AkkaRpcActor
URL: https://github.com/apache/flink/pull/6876#discussion_r246443067
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 ##
 @@ -144,6 +147,68 @@ public void testMessageDiscarding() throws Exception {
rpcEndpoint.shutDown();
}
 
+   @Test(expected = ExecutionException.class)
+   public void testOversizedResponseMsg() throws Exception {
+   Configuration configuration = new Configuration();
+   configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
+   AkkaRpcService rpcService = null;
+   OversizedResponseRpcEndpoint rpcEndpoint = null;
+   try {
+   rpcService = new TestingRemoteRpcService(configuration);
+
+   rpcEndpoint = new 
OversizedResponseRpcEndpoint(rpcService);
+
+   OversizedResponseMsgRpcGateway rpcGateway = 
rpcEndpoint.getSelfGateway(OversizedResponseMsgRpcGateway.class);
+
+   rpcEndpoint.start();
+
+   CompletableFuture result = 
rpcGateway.calculate();
+
+   result.get(timeout.getSize(), timeout.getUnit());
+
+   fail("Expected a ExecutionException about the 
oversize");
+   } finally {
 
 Review comment:
   @tillrohrmann When I accept the next suggestion (use two `RpcService` to 
mock real remote RPC), it will throw this exception : 
   
   ```
   java.lang.reflect.UndeclaredThrowableException
at org.apache.flink.runtime.rpc.akka.$Proxy8.calculate(Unknown Source)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.testOversizedResponseMsg(AkkaRpcActorTest.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
   Caused by: java.io.IOException: The rpc invocation size exceeds the maximum 
akka framesize.
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:262)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:211)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:127)
... 28 more
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10724) Refactor failure handling in check point coordinator

2019-01-09 Thread Andrey Zagrebin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738365#comment-16738365
 ] 

Andrey Zagrebin commented on FLINK-10724:
-

[~yanghua] let's give community some time this week to review the design doc, 
maybe [~till.rohrmann] could also have a look at the design.

> Refactor failure handling in check point coordinator
> 
>
> Key: FLINK-10724
> URL: https://issues.apache.org/jira/browse/FLINK-10724
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
>
> At the moment failure handling of asynchronously triggered checkpoint in 
> check point coordinator happens in different places. We could organise it 
> similar way as failure handling of synchronous triggering of checkpoint in 
> *CheckpointTriggerResult* where we classify error cases. This will simplify 
> e.g. integration of error counter for FLINK-4810.
> See also discussion here: [https://github.com/apache/flink/pull/6567]
> The specific design document : 
> https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2019-01-09 Thread GitBox
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r246435193
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -726,6 +730,41 @@ void sendPartitionInfos() {
}
}
 
+   /**
+* Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+*
+* @return whether the input constraint is satisfied
+*/
+   public boolean checkInputDependencyConstraints() {
+   if (getExecutionGraph().getInputDependencyConstraint() == 
InputDependencyConstraint.ANY) {
 
 Review comment:
   I've move `InputDependencyConstraint ` to JobVertex. And the job wide 
default value can be configured in `ExecutionConfig`. But I haven't make it 
configurable through DataSet/DataStream API yet.
   
   I agree we should support the constraint configurable for each operator. But 
I'm not quite sure whether we should support it with DataSet API or later for 
the stream/batch unified StreamGraph/Transformation API? Could you share your 
suggestion?
   
   In our production experience, a job-wide configured input constraint 
satisfies most users, together with `BATCH_FORCED` execution mode, to ensure a 
batch job can finish with limited resources.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2019-01-09 Thread GitBox
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r246435193
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -726,6 +730,41 @@ void sendPartitionInfos() {
}
}
 
+   /**
+* Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+*
+* @return whether the input constraint is satisfied
+*/
+   public boolean checkInputDependencyConstraints() {
+   if (getExecutionGraph().getInputDependencyConstraint() == 
InputDependencyConstraint.ANY) {
 
 Review comment:
   I've move `InputDependencyConstraint ` to JobVertex. And the job wide 
default value can be configured in `ExecutionConfig`. But I haven't make it 
configurable through DataSet/DataStream API yet.
   
   I agree we should support the constraint configurable for each operator. But 
I'm not quite sure whether we should support it with DataSet API or later for 
the unified StreamGraph/Transformation API? Could you share your suggestion?
   
   In our production experience, a job-wide configured input constraint 
satisfies most users, together with `BATCH_FORCED` execution mode, to ensure a 
batch job can finish with limited resources.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-09 Thread GitBox
pnowojski commented on a change in pull request #7438: [FLINK-11282][network] 
Merge StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#discussion_r246416809
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -72,17 +73,29 @@
 
private Counter numBuffersOut = new SimpleCounter();
 
+   /** Default name for teh output flush thread, if no name with a task 
reference is given. */
+   private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = 
"OutputFlusher";
+
+   /** The thread that periodically flushes the output, to give an upper 
latency bound. */
+   private final OutputFlusher outputFlusher;
+
+   /** The exception encountered in the flushing thread. */
+   private Throwable flusherException;
 
 Review comment:
   How this can be not `sychronized`/`volatile`? Adding synchronisation would 
affect performance, but not adding it requires some kind of comment that some 
magic is happening here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2019-01-09 Thread GitBox
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r246429765
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -686,6 +689,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID 
partitionId) {
 
if 
(partition.getIntermediateResult().getResultType().isPipelined()) {
// Schedule or update receivers of this partition
+   partition.markSomePipelinedDataProduced();
 
 Review comment:
   Sure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2019-01-09 Thread GitBox
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r246429791
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 ##
 @@ -179,6 +179,13 @@ public static ExecutionGraph buildGraph(
executionGraph.setScheduleMode(jobGraph.getScheduleMode());

executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
 
+   try {
+   executionGraph.setInputDependencyConstraint(
+
jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint());
 
 Review comment:
   Good suggestion. I've move `InputDependencyConstraint ` to JobVertex.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2019-01-09 Thread GitBox
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r246429640
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -726,6 +730,41 @@ void sendPartitionInfos() {
}
}
 
+   /**
+* Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+*
+* @return whether the input constraint is satisfied
+*/
+   public boolean checkInputDependencyConstraints() {
+   if (getExecutionGraph().getInputDependencyConstraint() == 
InputDependencyConstraint.ANY) {
+   // InputDependencyConstraint == ANY
+   return IntStream.range(0, 
inputEdges.length).anyMatch(this::isInputConsumable);
+   } else {
+   // InputDependencyConstraint == ALL
+   return IntStream.range(0, 
inputEdges.length).allMatch(this::isInputConsumable);
+   }
+   }
+
+   /**
+* An input is consumable when
+* 1. the source result is PIPELINED and one of the result partition 
has produced data.
+* 2. the source result is BLOCKING and is FINISHED(all partitions are 
FINISHED).
+*
+* @return whether the input is consumable
+*/
+   public boolean isInputConsumable(int inputNumber) {
 
 Review comment:
   A vertex `Input` is a bit different from its corresponding 
`IntermediateResult` with `POINTWISE` edge. So we need the `inputEdges` info in 
ExecutionVertex.
   
   I changed it a bit to be more concise here: An input is consumable when when 
any partition in it is consumable. (whether a partition is consumable is 
different for PIPELINED and BLOCKING results)
   
   B.T.W I'm also thinking about a later improvement that we can decide whether 
the input is consumable according to its completeness percentage. It's also a 
configuration related to `inputEdges`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2019-01-09 Thread GitBox
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r246429640
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -726,6 +730,41 @@ void sendPartitionInfos() {
}
}
 
+   /**
+* Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+*
+* @return whether the input constraint is satisfied
+*/
+   public boolean checkInputDependencyConstraints() {
+   if (getExecutionGraph().getInputDependencyConstraint() == 
InputDependencyConstraint.ANY) {
+   // InputDependencyConstraint == ANY
+   return IntStream.range(0, 
inputEdges.length).anyMatch(this::isInputConsumable);
+   } else {
+   // InputDependencyConstraint == ALL
+   return IntStream.range(0, 
inputEdges.length).allMatch(this::isInputConsumable);
+   }
+   }
+
+   /**
+* An input is consumable when
+* 1. the source result is PIPELINED and one of the result partition 
has produced data.
+* 2. the source result is BLOCKING and is FINISHED(all partitions are 
FINISHED).
+*
+* @return whether the input is consumable
+*/
+   public boolean isInputConsumable(int inputNumber) {
 
 Review comment:
   A vertex `Input` is a bit different from its corresponding 
`IntermediateResult` with `POINTWISE` edge. So we need the `inputEdges` info in 
ExecutionVertex.
   
   I change it a bit to be more concise here: An input is consumable when when 
any partition in it is consumable. (whether a partition is consumable is 
different for PIPELINED and BLOCKING results)
   
   B.T.W I'm also thinking about a later improvement that we can decide whether 
the input is consumable according to its completeness percentage. It's also a 
configuration related to `inputEdges`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-09 Thread GitBox
pnowojski commented on a change in pull request #7438: [FLINK-11282][network] 
Merge StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#discussion_r246416809
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -72,17 +73,29 @@
 
private Counter numBuffersOut = new SimpleCounter();
 
+   /** Default name for teh output flush thread, if no name with a task 
reference is given. */
+   private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = 
"OutputFlusher";
+
+   /** The thread that periodically flushes the output, to give an upper 
latency bound. */
+   private final OutputFlusher outputFlusher;
+
+   /** The exception encountered in the flushing thread. */
+   private Throwable flusherException;
 
 Review comment:
   How this can be not `sychronized`/`volatile`? Adding synchronisation would 
affect performance, but not adding it requires some kind of comment...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-09 Thread GitBox
pnowojski commented on a change in pull request #7438: [FLINK-11282][network] 
Merge StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#discussion_r246416809
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -72,17 +73,29 @@
 
private Counter numBuffersOut = new SimpleCounter();
 
+   /** Default name for teh output flush thread, if no name with a task 
reference is given. */
+   private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = 
"OutputFlusher";
+
+   /** The thread that periodically flushes the output, to give an upper 
latency bound. */
+   private final OutputFlusher outputFlusher;
+
+   /** The exception encountered in the flushing thread. */
+   private Throwable flusherException;
 
 Review comment:
   How this can be not `sychronized`/`volatile`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-09 Thread GitBox
pnowojski commented on a change in pull request #7438: [FLINK-11282][network] 
Merge StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#discussion_r246427748
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -72,17 +73,29 @@
 
private Counter numBuffersOut = new SimpleCounter();
 
+   /** Default name for teh output flush thread, if no name with a task 
reference is given. */
+   private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = 
"OutputFlusher";
+
+   /** The thread that periodically flushes the output, to give an upper 
latency bound. */
+   private final OutputFlusher outputFlusher;
 
 Review comment:
   make it `Optional` instead of secret nullable field?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol closed pull request #7416: [FLINK-11267][release] Only create hadoop-free binary by default

2019-01-09 Thread GitBox
zentol closed pull request #7416: [FLINK-11267][release] Only create 
hadoop-free binary by default
URL: https://github.com/apache/flink/pull/7416
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tools/releasing/create_binary_release.sh 
b/tools/releasing/create_binary_release.sh
index 444e01b28ce..137555356a9 100755
--- a/tools/releasing/create_binary_release.sh
+++ b/tools/releasing/create_binary_release.sh
@@ -95,27 +95,15 @@ make_binary_release() {
   cd ${FLINK_DIR}
 }
 
-HADOOP_CLASSIFIERS=("24" "26" "27" "28")
-HADOOP_VERSIONS=("2.4.1" "2.6.5" "2.7.5" "2.8.3")
-
 if [ "$SCALA_VERSION" == "none" ] && [ "$HADOOP_VERSION" == "none" ]; then
   make_binary_release "" "-DwithoutHadoop" "2.12"
-  for i in "${!HADOOP_CLASSIFIERS[@]}"; do
-make_binary_release "hadoop${HADOOP_CLASSIFIERS[$i]}" 
"-Dhadoop.version=${HADOOP_VERSIONS[$i]}" "2.12"
-  done
   make_binary_release "" "-DwithoutHadoop" "2.11"
-  for i in "${!HADOOP_CLASSIFIERS[@]}"; do
-make_binary_release "hadoop${HADOOP_CLASSIFIERS[$i]}" 
"-Dhadoop.version=${HADOOP_VERSIONS[$i]}" "2.11"
-  done
 elif [ "$SCALA_VERSION" == none ] && [ "$HADOOP_VERSION" != "none" ]
 then
   make_binary_release "hadoop2" "-Dhadoop.version=$HADOOP_VERSION" "2.11"
 elif [ "$SCALA_VERSION" != none ] && [ "$HADOOP_VERSION" == "none" ]
 then
   make_binary_release "" "-DwithoutHadoop" "$SCALA_VERSION"
-  for i in "${!HADOOP_CLASSIFIERS[@]}"; do
-make_binary_release "hadoop${HADOOP_CLASSIFIERS[$i]}" 
"-Dhadoop.version=${HADOOP_VERSIONS[$i]}" "$SCALA_VERSION"
-  done
 else
   make_binary_release "hadoop2x" "-Dhadoop.version=$HADOOP_VERSION" 
"$SCALA_VERSION"
 fi


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11288) Add separate flink-ml module for building fat jar

2019-01-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11288:
---
Labels: pull-request-available  (was: )

> Add separate flink-ml module for building fat jar
> -
>
> Key: FLINK-11288
> URL: https://issues.apache.org/jira/browse/FLINK-11288
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.7.1, 1.8.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>
> Similar to sql-connectors the flink-ml fat jar , that is included in 
> flink-dist, should be built in a separate module so that we can add proper 
> licensing to it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol opened a new pull request #7444: [FLINK-11288][examples] Rework examples to account for licensing

2019-01-09 Thread GitBox
zentol opened a new pull request #7444: [FLINK-11288][examples] Rework examples 
to account for licensing
URL: https://github.com/apache/flink/pull/7444
 
 
   ## What is the purpose of the change
   
   This PR adds separate fat-jar modules for the twitter and state-machine 
examples so that we can properly handle licenses for them. These jars have been 
bundling dependencies without any license files, requiring us to handle these 
dependencies in the flink-dist NOTICE-binary manually.
   
   These modules work similar to the `flink-sql-connector` modules in that they 
do not contain any code themselves; they are purely used for assembling a jar 
with proper licensing. These jars are not deployed to maven, and only included 
in flink-dist.
   
   ## Brief change log
   
   * add new `flink-examples-build-helper` module
   * add sub modules for each example
   * add licensing to new modules
   * move shade-plugin configuration for jar assembly from 
flink-examples-streaming to respective module
   * update flink-dist assembly to account for new modules
   
   ## Verifying this change
   
   Manually verified. Ran both examples on a local cluster and inspected jar 
contents.
   
   The state-machine jar no longer includes classes from flink-core that were 
accidentally pulled in.
   The Twitter jar no longer bundles chill that was accidentally pulled in.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10550) Remove LegacyYarnClusterDescriptor

2019-01-09 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738334#comment-16738334
 ] 

vinoyang edited comment on FLINK-10550 at 1/9/19 3:19 PM:
--

No, I have not started this issue. Done.


was (Author: yanghua):
Yes, I have not started this issue. Done.

> Remove LegacyYarnClusterDescriptor
> --
>
> Key: FLINK-10550
> URL: https://issues.apache.org/jira/browse/FLINK-10550
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10550) Remove LegacyYarnClusterDescriptor

2019-01-09 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738334#comment-16738334
 ] 

vinoyang commented on FLINK-10550:
--

Yes, I have not started this issue. Done.

> Remove LegacyYarnClusterDescriptor
> --
>
> Key: FLINK-10550
> URL: https://issues.apache.org/jira/browse/FLINK-10550
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10550) Remove LegacyYarnClusterDescriptor

2019-01-09 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10550:


Assignee: (was: vinoyang)

> Remove LegacyYarnClusterDescriptor
> --
>
> Key: FLINK-10550
> URL: https://issues.apache.org/jira/browse/FLINK-10550
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11263) yarn session script doesn't boot up task managers

2019-01-09 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-11263.
-
Resolution: Duplicate

> yarn session script doesn't boot up task managers
> -
>
> Key: FLINK-11263
> URL: https://issues.apache.org/jira/browse/FLINK-11263
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.6.3
> Environment: Flink 1.6.3 
> Hadoop 2.7.5
>Reporter: Hongtao Zhang
>Priority: Critical
>
> {{./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m}}
> {{I want to boot up a Yarn Session Cluster use the command above}}
> {{but Flink doesn't create the task executors but only the 
> applicationMaster(YarnSessionClusterEntryPoint)}}
>  
> {{the Task Executors will be created when a job was submitted.}}
>  
> {{the point is that Yarn ResourceManager doesn't know how many task executors 
> should be created because the -n option was not accept by ResourceMananger}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10550) Remove LegacyYarnClusterDescriptor

2019-01-09 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738321#comment-16738321
 ] 

Till Rohrmann commented on FLINK-10550:
---

Hi [~yanghua], are you working on this issue? If not, then please unassign.

> Remove LegacyYarnClusterDescriptor
> --
>
> Key: FLINK-10550
> URL: https://issues.apache.org/jira/browse/FLINK-10550
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11286) Support to send StreamStatus.IDLE for non-source operators

2019-01-09 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738318#comment-16738318
 ] 

vinoyang commented on FLINK-11286:
--

[~aljoscha] 

I think it makes sense to allow watermarks to be assigned from the pipeline 
(not just the source). It is not always the source that is suitable for 
assigning watermarks. In many of our business, we need to go through the ETL to 
parse the format or filter the data before we assign the watermark.

I hope that {{markAsTemporarilyIdle}} (or a similar mechanism) can be called 
where the watermark could be assigned.

> Support to send StreamStatus.IDLE for non-source operators 
> ---
>
> Key: FLINK-11286
> URL: https://issues.apache.org/jira/browse/FLINK-11286
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, only stream source tasks can be marked as temporary idle. But many 
> times, this approach has limitations.
> Considering such a scenario, there is a DAG as follows: 
> {{source->map->filter->flatmap->keyBy->window}}, with a degree of parallelism 
> of 10. Among them, the watermark is not sent by the source operator, but is 
> downstream, such as flatmap. Every source subtask will not be idle. However, 
> after the filter, some pipelines generate "idle". For example, there are 3 
> pipelines that will no longer have data sent downstream. At this time, we 
> can't call the {{markAsTemporarilyIdle}} method to mark the current pipeline 
> in the idle state. This will affect the downstream window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10571) Remove support for topologies

2019-01-09 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10571.

Resolution: Fixed

master: 7a9a6adf18bc6232e15e42252807793c8a94a131

> Remove support for topologies
> -
>
> Key: FLINK-10571
> URL: https://issues.apache.org/jira/browse/FLINK-10571
> Project: Flink
>  Issue Type: Sub-task
>  Components: Storm Compatibility
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The topology support is hard-wired to the legacy mode and we have no 
> intention of updating it. It should thus be removed to not block the removal 
> of the legacy mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers

2019-01-09 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-10848:
-

Assignee: Till Rohrmann  (was: Shuyi Chen)

> Flink's Yarn ResourceManager can allocate too many excess containers
> 
>
> Key: FLINK-10848
> URL: https://issues.apache.org/jira/browse/FLINK-10848
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2
>Reporter: Shuyi Chen
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, both the YarnFlinkResourceManager and YarnResourceManager do not 
> call removeContainerRequest() on container allocation success. Because the 
> YARN AM-RM protocol is not a delta protocol (please see YARN-1902), 
> AMRMClient will keep all ContainerRequests that are added and send them to RM.
> In production, we observe the following that verifies the theory: 16 
> containers are allocated and used upon cluster startup; when a TM is killed, 
> 17 containers are allocated, 1 container is used, and 16 excess containers 
> are returned; when another TM is killed, 18 containers are allocated, 1 
> container is used, and 17 excess containers are returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7409: [FLINK-11064] [table] Setup a new flink-table module structure

2019-01-09 Thread GitBox
zentol commented on a change in pull request #7409: [FLINK-11064] [table] Setup 
a new flink-table module structure
URL: https://github.com/apache/flink/pull/7409#discussion_r246399194
 
 

 ##
 File path: flink-table/flink-table-runtime/pom.xml
 ##
 @@ -0,0 +1,53 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-table
+   1.8-SNAPSHOT
+   ..
+   
+
+   
+   
 
 Review comment:
   This could be moved into the `` field of the module.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11293) KafkaITCase.testConcurrentProducerConsumerTopology times out on Travis

2019-01-09 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738290#comment-16738290
 ] 

Till Rohrmann commented on FLINK-11293:
---

Another instance: https://api.travis-ci.org/v3/job/475294913/log.txt

> KafkaITCase.testConcurrentProducerConsumerTopology times out on Travis
> --
>
> Key: FLINK-11293
> URL: https://issues.apache.org/jira/browse/FLINK-11293
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.8.0
>
>
> The {{KafkaITCase.testConcurrentProducerConsumerTopology}} times out on 
> Travis.
> {code}
> 20:26:29.579 [ERROR] Errors: 
> 20:26:29.579 [ERROR]   
> KafkaITCase.testConcurrentProducerConsumerTopology:73->KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology:824->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
>  » TestTimedOut
> {code}
> The solution might as simple as increasing the timeout.
> https://api.travis-ci.org/v3/job/476975725/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #7417: [FLINK-11268][release] Deploy multiple flink-shaded-hadoop2 artifacts

2019-01-09 Thread GitBox
tillrohrmann commented on issue #7417: [FLINK-11268][release] Deploy multiple 
flink-shaded-hadoop2 artifacts
URL: https://github.com/apache/flink/pull/7417#issuecomment-452713386
 
 
   Failing test case is reported via 
https://issues.apache.org/jira/browse/FLINK-11293.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on issue #7351: [FLINK-11008][State Backends, Checkpointing]SpeedUp upload state files using multithread

2019-01-09 Thread GitBox
klion26 commented on issue #7351: [FLINK-11008][State Backends, 
Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#issuecomment-452707135
 
 
   @azagrebin I've rebased master and addressed all the comments. I can't 
fast-forward push the changed code to my remote branch, so I squashed the four 
commits and force pushed, sorry for the inconvenience.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7416: [FLINK-11267][release] Only create hadoop-free binary by default

2019-01-09 Thread GitBox
zentol commented on issue #7416: [FLINK-11267][release] Only create hadoop-free 
binary by default
URL: https://github.com/apache/flink/pull/7416#issuecomment-452704894
 
 
   reversing the profile is a follow-up, see FLINK-11270.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #7340: [FLINK-11174] [prometheus] Flink Metrics Prometheus label values supp…

2019-01-09 Thread GitBox
TisonKun commented on a change in pull request #7340: [FLINK-11174] 
[prometheus] Flink Metrics Prometheus label values supp…
URL: https://github.com/apache/flink/pull/7340#discussion_r246389832
 
 

 ##
 File path: 
flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporterOptions.java
 ##
 @@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Config options for the {@link AbstractPrometheusReporter}.
+ */
+public class PrometheusReporterOptions {
+   public static final ConfigOption FILTER_LABEL_VALUE_CHARACTER 
= ConfigOptions
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] KarmaGYZ opened a new pull request #7443: [hotfix][docs] The CLI doc could not properly shown due to markdown i…

2019-01-09 Thread GitBox
KarmaGYZ opened a new pull request #7443: [hotfix][docs] The CLI doc could not 
properly shown due to markdown i…
URL: https://github.com/apache/flink/pull/7443
 
 
   …ssue
   
   ## What is the purpose of the change
   
   The markdown treats the TOC as the last item in CLI use list, thus this list 
could not be shown properly. This PR revert the two paragraph to fix this issue.
   [CLI 
doc](https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html)
   
   ## Brief change log
   
   - Rever two paragraph to make CLI use list shown properly
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-09 Thread GitBox
tillrohrmann commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r246385319
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -732,9 +745,7 @@ public void run() {
throw new RuntimeException(discoveryLoopError);
}
} else {
-   // won't be using the discoverer
-   partitionDiscoverer.close();
-
+   // partitionDiscoverer is already closed in open method
 
 Review comment:
   It wouldn't change the existing behaviour except for the case where 
`kafkaFetcher.runFetchLoop` throws an exception where we don't close the 
`partitionDiscoverer` in line 721 atm. If the `kafkaFetcher` fails, then it 
shouldn't matter whether the `discoveryLoopThread` also fails or not, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2019-01-09 Thread GitBox
tillrohrmann commented on a change in pull request #7020: [FLINK-10774] [Kafka] 
connection leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#discussion_r246385319
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -732,9 +745,7 @@ public void run() {
throw new RuntimeException(discoveryLoopError);
}
} else {
-   // won't be using the discoverer
-   partitionDiscoverer.close();
-
+   // partitionDiscoverer is already closed in open method
 
 Review comment:
   It wouldn't change the existing behaviour except for the case where 
`kafkaFetcher.runFetchLoop` throws an exception where we don't close the 
`partitionDiscoverer` in line 721. If the `kafkaFetcher` fails, then it 
shouldn't matter whether the `discoveryLoopThread` also fails or not, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol closed pull request #7402: [hotfix][docs] Fulfill some empty description in Config Doc

2019-01-09 Thread GitBox
zentol closed pull request #7402: [hotfix][docs] Fulfill some empty description 
in Config Doc
URL: https://github.com/apache/flink/pull/7402
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/common_section.html 
b/docs/_includes/generated/common_section.html
index 65804db2283..1c6685b2f4c 100644
--- a/docs/_includes/generated/common_section.html
+++ b/docs/_includes/generated/common_section.html
@@ -20,7 +20,7 @@
 
 parallelism.default
 1
-
+Default parallelism for jobs.
 
 
 taskmanager.numberOfTaskSlots
diff --git a/docs/_includes/generated/core_configuration.html 
b/docs/_includes/generated/core_configuration.html
index 4366e8b246f..540290045a8 100644
--- a/docs/_includes/generated/core_configuration.html
+++ b/docs/_includes/generated/core_configuration.html
@@ -25,12 +25,12 @@
 
 io.tmp.dirs
 'LOCAL_DIRS' on Yarn. 
'_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in 
standalone.
-
+Directories for temporary files, separated by",", "|", or the 
system's java.io.File.pathSeparator.
 
 
 parallelism.default
 1
-
+Default parallelism for jobs.
 
 
 
diff --git a/docs/_includes/generated/history_server_configuration.html 
b/docs/_includes/generated/history_server_configuration.html
index 272606e2884..7691c0abe62 100644
--- a/docs/_includes/generated/history_server_configuration.html
+++ b/docs/_includes/generated/history_server_configuration.html
@@ -30,7 +30,7 @@
 
 historyserver.web.refresh-interval
 1
-
+The refresh interval for the HistoryServer web-frontend in 
milliseconds.
 
 
 historyserver.web.ssl.enabled
diff --git a/docs/_includes/generated/job_manager_configuration.html 
b/docs/_includes/generated/job_manager_configuration.html
index 99eec1dcec7..177c362fb8b 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -10,7 +10,7 @@
 
 jobmanager.archive.fs.dir
 (none)
-
+Dictionary for JobManager to store the archives of completed 
jobs.
 
 
 jobmanager.execution.attempts-history-size
diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
index 420bb7f1da2..39a76cef165 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -50,12 +50,12 @@
 
 metrics.reporters
 (none)
-
+An optional list of reporter names. If configured, only 
reporters whose name matches any of the names in the list will be started. 
Otherwise, all reporters that could be found in the configuration will be 
started.
 
 
 metrics.scope.delimiter
 "."
-
+Delimiter used to assemble the metric identifier.
 
 
 metrics.scope.jm
@@ -90,12 +90,12 @@
 
 metrics.system-resource
 false
-
+Flag indicating whether Flink should report system resource 
metrics such as machine's CPU, memory or network usage.
 
 
 metrics.system-resource-probing-interval
 5000
-
+Interval between probing of system resource metrics specified 
in milliseconds. Has an effect only when 'metrics.system-resource' is 
enabled.
 
 
 
diff --git a/docs/_includes/generated/resource_manager_configuration.html 
b/docs/_includes/generated/resource_manager_configuration.html
index 9243fcd3cb9..3448aba8f52 100644
--- a/docs/_includes/generated/resource_manager_configuration.html
+++ b/docs/_includes/generated/resource_manager_configuration.html
@@ -20,7 +20,7 @@
 
 local.number-resourcemanager
 1
-
+The number of resource managers start.
 
 
 resourcemanager.job.timeout
diff --git a/docs/_includes/generated/task_manager_configuration.html 
b/docs/_includes/generated/task_manager_configuration.html
index 63bfb043ea4..2093531211d 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -20,7 +20,7 @@
 
 task.cancellation.timers.timeout
 7500
-
+Time we wait for the timers in milliseconds to finish all 
pending timer threads when

[GitHub] tillrohrmann commented on a change in pull request #7441: [FLINK-11156][tests, runtime] Reconcile ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9

2019-01-09 Thread GitBox
tillrohrmann commented on a change in pull request #7441: [FLINK-11156][tests, 
runtime] Reconcile ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9
URL: https://github.com/apache/flink/pull/7441#discussion_r246381261
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ##
 @@ -98,43 +92,23 @@
 *   least 1). Adding more 
checkpoints than this results
 *   in older checkpoints being 
discarded. On recovery,
 *   we will only start with a 
single checkpoint.
-* @param client The Curator ZooKeeper client
-* @param checkpointsPathThe ZooKeeper path for the 
checkpoints (needs to
-*   start with a '/')
-* @param stateStorage   State storage to be used to 
persist the completed
-*   checkpoint
 * @param executor to execute blocking calls
 * @throws Exception
 */
public ZooKeeperCompletedCheckpointStore(
int maxNumberOfCheckpointsToRetain,
-   CuratorFramework client,
-   String checkpointsPath,
-   RetrievableStateStorageHelper 
stateStorage,
-   Executor executor) throws Exception {
+   ZooKeeperStateHandleStore 
checkpointsInZooKeeper,
+   Executor executor) {
 
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain 
at least one checkpoint.");
-   checkNotNull(stateStorage, "State storage");
 
this.maxNumberOfCheckpointsToRetain = 
maxNumberOfCheckpointsToRetain;
 
-   checkNotNull(client, "Curator client");
-   checkNotNull(checkpointsPath, "Checkpoints path");
-
-   // Ensure that the checkpoints path exists
-   client.newNamespaceAwareEnsurePath(checkpointsPath)
-   .ensure(client.getZookeeperClient());
-
-   // All operations will have the path as root
-   this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
-
-   this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage);
+   this.checkpointsInZooKeeper = checkpointsInZooKeeper;
 
 Review comment:
   `checkNotNull`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7441: [FLINK-11156][tests, runtime] Reconcile ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9

2019-01-09 Thread GitBox
tillrohrmann commented on a change in pull request #7441: [FLINK-11156][tests, 
runtime] Reconcile ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9
URL: https://github.com/apache/flink/pull/7441#discussion_r246381058
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ##
 @@ -98,43 +92,23 @@
 *   least 1). Adding more 
checkpoints than this results
 *   in older checkpoints being 
discarded. On recovery,
 *   we will only start with a 
single checkpoint.
-* @param client The Curator ZooKeeper client
-* @param checkpointsPathThe ZooKeeper path for the 
checkpoints (needs to
-*   start with a '/')
-* @param stateStorage   State storage to be used to 
persist the completed
-*   checkpoint
 
 Review comment:
   new parameter JavaDocs are missing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9

2019-01-09 Thread GitBox
TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile 
ZooKeeperCompletedCheckpointStoreMockitoTest with JDK 9
URL: https://github.com/apache/flink/pull/7302#issuecomment-452698412
 
 
   @GJL thanks for your follow-up :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11288) Add separate flink-ml module for building fat jar

2019-01-09 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738230#comment-16738230
 ] 

Aljoscha Krettek commented on FLINK-11288:
--

I would be in favour of keeping {{flink-diet}} slim.

> Add separate flink-ml module for building fat jar
> -
>
> Key: FLINK-11288
> URL: https://issues.apache.org/jira/browse/FLINK-11288
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.7.1, 1.8.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.7.2, 1.8.0
>
>
> Similar to sql-connectors the flink-ml fat jar , that is included in 
> flink-dist, should be built in a separate module so that we can add proper 
> licensing to it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11293) KafkaITCase.testConcurrentProducerConsumerTopology times out on Travis

2019-01-09 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738234#comment-16738234
 ] 

Till Rohrmann commented on FLINK-11293:
---

In the test logs I looks as if the deletion of the topic was completed when 
cleaning up the test case. However, then the program hangs for quite some time 
until it times out:
{code}
20:16:04,637 INFO  kafka.controller.TopicDeletionManager
 - [Topic Deletion Manager 0], Deletion of topic 
concurrentProducerConsumerTopic_f33b5fc7-586d-4a9c-9b1c-4e1f450652e9 
successfully completed
20:16:04,637 INFO  kafka.controller.KafkaController 
 - [Controller id=0] New topics: [Set()], deleted topics: [Set()], new 
partition replica assignment [Map()]
20:16:56,502 INFO  kafka.log.LogSegment 
 - Deleted log 
/tmp/kafkaITcase-kafka-dir-d834322d-d069-4586-8a82-7e7b39f879de/server-0/tstopic-1.de0e994871d847fca012c0351bcdfeca-delete/.log.
{code}
I'm not sure why this is happening.

> KafkaITCase.testConcurrentProducerConsumerTopology times out on Travis
> --
>
> Key: FLINK-11293
> URL: https://issues.apache.org/jira/browse/FLINK-11293
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.8.0
>
>
> The {{KafkaITCase.testConcurrentProducerConsumerTopology}} times out on 
> Travis.
> {code}
> 20:26:29.579 [ERROR] Errors: 
> 20:26:29.579 [ERROR]   
> KafkaITCase.testConcurrentProducerConsumerTopology:73->KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology:824->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
>  » TestTimedOut
> {code}
> The solution might as simple as increasing the timeout.
> https://api.travis-ci.org/v3/job/476975725/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] KarmaGYZ commented on a change in pull request #7402: [hotfix][docs] Fulfill some empty description in Config Doc

2019-01-09 Thread GitBox
KarmaGYZ commented on a change in pull request #7402: [hotfix][docs] Fulfill 
some empty description in Config Doc
URL: https://github.com/apache/flink/pull/7402#discussion_r246380092
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
 ##
 @@ -135,14 +139,18 @@
 */
public static final ConfigOption SYSTEM_RESOURCE_METRICS =
key("metrics.system-resource")
-   .defaultValue(false);
+   .defaultValue(false)
+   .withDescription("Flag indicating whether Flink should 
report system resource metrics such as machine's CPU," +
+   " memory or network usage.");
/**
 * Interval between probing of system resource metrics specified in 
milliseconds. Has an effect only when
 * {@link #SYSTEM_RESOURCE_METRICS} is enabled.
 */
public static final ConfigOption 
SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL =
key("metrics.system-resource-probing-interval")
-   .defaultValue(5000L);
+   .defaultValue(5000L)
+   .withDescription("Interval between probing of system 
resource metrics specified in milliseconds. Has an effect" +
+   " only when SYSTEM_RESOURCE_METRICS is 
enabled");
 
 Review comment:
   Good catch! Revised.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11256) Referencing StreamNode objects directly in StreamEdge causes the sizes of JobGraph and TDD to become unnecessarily large

2019-01-09 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738228#comment-16738228
 ] 

Aljoscha Krettek commented on FLINK-11256:
--

Please don't try to be too complicated on this. We will have to rework the 
basic Flink DAG and operator interfaces in the future and any work that we now 
put into the StreamGraph abstractions will be legacy code by then.

If it's very small fixes it's probably ok but I wouldn't do anything bigger.

> Referencing StreamNode objects directly in StreamEdge causes the sizes of 
> JobGraph and TDD to become unnecessarily large
> 
>
> Key: FLINK-11256
> URL: https://issues.apache.org/jira/browse/FLINK-11256
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Haibo Suen
>Assignee: Haibo Suen
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When a job graph is generated from StreamGraph, StreamEdge(s) on the stream 
> graph are serialized to StreamConfig and stored into the job graph. After 
> that, the serialized bytes will be included in the TDD and distributed to TM. 
> Because StreamEdge directly reference to StreamNode objects including 
> sourceVertex and targetVertex, these objects are also written transitively on 
> serializing StreamEdge. But these StreamNode objects are not needed in JM and 
> Task. For a large size topology, this will causes JobGraph/TDD to become much 
> larger than that actually need, and more likely to occur rpc timeout when 
> transmitted.
> In StreamEdge, only the ID of StreamNode should be stored to avoid this 
> situation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11187) StreamingFileSink with S3 backend transient socket timeout issues

2019-01-09 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738224#comment-16738224
 ] 

Aljoscha Krettek commented on FLINK-11187:
--

After the fix, does this even need the stream nature of 
{{RefCountedBufferingFileStream}} anymore?

> StreamingFileSink with S3 backend transient socket timeout issues 
> --
>
> Key: FLINK-11187
> URL: https://issues.apache.org/jira/browse/FLINK-11187
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Streaming Connectors
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Addison Higham
>Assignee: Addison Higham
>Priority: Major
> Fix For: 1.7.2, 1.8.0
>
>
> When using the StreamingFileSink with S3A backend, occasionally, errors like 
> this will occur:
> {noformat}
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  Your socket connection to the server was not read from or written to within 
> the timeout period. Idle connections will be closed. (Service: Amazon S3; 
> Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended 
> Request ID: xxx, S3 Extended Request ID: xxx
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>    at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
> This causes a restart of flink job, which is often able to recover from, but 
> under heavy load, this can become very frequent.
>  
> Turning on debug logs you can find the following relevant stack trace:
> {noformat}
> 2018-12-17 05:55:46,546 DEBUG 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: 
> failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)

[GitHub] zentol commented on a change in pull request #7402: [hotfix][docs] Fulfill some empty description in Config Doc

2019-01-09 Thread GitBox
zentol commented on a change in pull request #7402: [hotfix][docs] Fulfill some 
empty description in Config Doc
URL: https://github.com/apache/flink/pull/7402#discussion_r246375349
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
 ##
 @@ -135,14 +139,18 @@
 */
public static final ConfigOption SYSTEM_RESOURCE_METRICS =
key("metrics.system-resource")
-   .defaultValue(false);
+   .defaultValue(false)
+   .withDescription("Flag indicating whether Flink should 
report system resource metrics such as machine's CPU," +
+   " memory or network usage.");
/**
 * Interval between probing of system resource metrics specified in 
milliseconds. Has an effect only when
 * {@link #SYSTEM_RESOURCE_METRICS} is enabled.
 */
public static final ConfigOption 
SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL =
key("metrics.system-resource-probing-interval")
-   .defaultValue(5000L);
+   .defaultValue(5000L)
+   .withDescription("Interval between probing of system 
resource metrics specified in milliseconds. Has an effect" +
+   " only when SYSTEM_RESOURCE_METRICS is 
enabled");
 
 Review comment:
   please refer to the actual configuration key for `SYSTEM_RESOURCE_METRICS ` 
instead, i.e. `"Interval ... only when '" + SYSTEM_RESOURCE_METRICS.key() + "' 
is enabled."`.
   
   The description is also missing a final period.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7402: [hotfix][docs] Fulfill some empty description in Config Doc

2019-01-09 Thread GitBox
zentol commented on issue #7402: [hotfix][docs] Fulfill some empty description 
in Config Doc
URL: https://github.com/apache/flink/pull/7402#issuecomment-452689975
 
 
   Let's stick to the missing ones for now, I don't think anyone has time to 
review such a cleanup.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11293) KafkaITCase.testConcurrentProducerConsumerTopology times out on Travis

2019-01-09 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-11293:
--
Issue Type: Bug  (was: Improvement)

> KafkaITCase.testConcurrentProducerConsumerTopology times out on Travis
> --
>
> Key: FLINK-11293
> URL: https://issues.apache.org/jira/browse/FLINK-11293
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.8.0
>
>
> The {{KafkaITCase.testConcurrentProducerConsumerTopology}} times out on 
> Travis.
> {code}
> 20:26:29.579 [ERROR] Errors: 
> 20:26:29.579 [ERROR]   
> KafkaITCase.testConcurrentProducerConsumerTopology:73->KafkaConsumerTestBase.runSimpleConcurrentProducerConsumerTopology:824->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
>  » TestTimedOut
> {code}
> The solution might as simple as increasing the timeout.
> https://api.travis-ci.org/v3/job/476975725/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >