[GitHub] [flink] zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#relea

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8857: 
[FLINK-12960][coordination][shuffle] Move 
ResultPartitionDeploymentDescriptor#releasedOnConsumption to 
PartitionDescriptor#releasedOnConsumption
URL: https://github.com/apache/flink/pull/8857#discussion_r297529541
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 ##
 @@ -65,7 +65,7 @@ private static void testForceConsumptionOnRelease(boolean 
forceConsumptionOnRele
ResultPartitionType.BLOCKING,
1,
0),
-   NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
+   
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(true).buildLocal(),
 
 Review comment:
   might be proper for using `partitionType.isBlocking()` instead of direct 
`true`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12536) Make BufferOrEventSequence#getNext() non-blocking

2019-06-26 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873088#comment-16873088
 ] 

Congxian Qiu(klion26) commented on FLINK-12536:
---

As discussed with [~pnowojski] offline, will use a buffer pool(default 2, can 
be configurable) and reuse the read thread in IOManager to read the 
BufferOrEvent asynchronous.

 

Create an issue FLINK-12994 to track the improvement for buffer processing 
performance in SpilledBufferOrEventSequence#getNext.

 

The benchmark result is as following:

 

Benchmark                                       Mode  Cnt      Score      Error 
  Units

ConsumerBench.testBuffer                       thrpt   30     99.528 ±    1.923 
 ops/ms

ConsumerBench.testBufferWithWrap               thrpt   30    827.931 ±   15.148 
 ops/ms

ConsumerBench.testCancelCheckpointMarkerEvent  thrpt   30  40144.914 ± 1244.112 
 ops/ms

ConsumerBench.testCheckpointBarrierEvent       thrpt   30  29784.258 ±  467.440 
 ops/ms

ConsumerBench.testEndOfPartitionEvent          thrpt   30  48562.810 ± 1599.328 
 ops/ms

FileIOBench.test1SyncIO                        thrpt   30    359.367 ±    3.203 
 ops/ms

FileIOBench.testAsyncIO                        thrpt   30     66.573 ±    3.384 
 ops/ms

[1] `testBuffer` test `SpilledBufferOrEventSequence#getNext` using 
`MemorySegmentFactory.allocateUnpooledSegment(pageSize)`, `testBufferWithWrap` 
test `SpilledBufferOrEventSequence#getNext` using 
`MemorySegmentFactory.wrap()`, `test***Event` test all the event processing, 
`test1SyncIO` tests the `FileChannel` and `testAsyncIO` tests the 
`AsynchronousFileChannel`

[2] 
[https://github.com/klion26/FileIOBench/blob/8618fc4d6e745d8dd762b87505102e6cec78dc9b/src/main/java/com/klion26/data/ConsumerBench.java#L130]

[3] 
https://github.com/klion26/FileIOBench/blob/8618fc4d6e745d8dd762b87505102e6cec78dc9b/src/main/java/com/klion26/data/ConsumerBench.java#L169

> Make BufferOrEventSequence#getNext() non-blocking
> -
>
> Key: FLINK-12536
> URL: https://issues.apache.org/jira/browse/FLINK-12536
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Piotr Nowojski
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently it is non-blocking in case of credit-based flow control (default), 
> however for \{{SpilledBufferOrEventSequence}} it is blocking on reading from 
> file. We might want to consider reimplementing it to be non blocking with 
> {{CompletableFuture isAvailable()}} method.
>  
> Otherwise we will block mailbox processing for the duration of reading from 
> file - for example we will block processing time timers and potentially in 
> the future network flushes.
>  
> This is not a high priority change, since it affects non-default 
> configuration option AND at the moment only processing time timers are 
> planned to be moved to the mailbox for 1.9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime

2019-06-26 Thread GitBox
liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] 
Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
URL: https://github.com/apache/flink/pull/8757#discussion_r297546738
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDate;
+
+/**
+ * This class can not extend {@link BasicTypeComparator}, because LocalDate is 
a
+ * Comparable of ChronoLocalDate instead of Comparable of LocalDate.
+ */
+@Internal
+public final class LocalDateComparator extends TypeComparator 
implements Serializable {
+
+   private transient LocalDate reference;
+
+   protected final boolean ascendingComparison;
+
+   // For use by getComparators
+   @SuppressWarnings("rawtypes")
+   private final LocalDateComparator[] comparators = new 
LocalDateComparator[] {this};
+
+   public LocalDateComparator(boolean ascending) {
+   this.ascendingComparison = ascending;
+   }
+
+   @Override
+   public int hash(LocalDate value) {
+   return value.hashCode();
+   }
+
+   @Override
+   public void setReference(LocalDate toCompare) {
+   this.reference = toCompare;
+   }
+
+   @Override
+   public boolean equalToReference(LocalDate candidate) {
+   return candidate.equals(reference);
 
 Review comment:
   use Objects.equals to avid NPE?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-26 Thread GitBox
zentol commented on a change in pull request #8778: [FLINK-12615][coordination] 
Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r297555765
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -883,6 +883,17 @@ private void jobStatusChanged(
validateRunsInMainThread();
 
if (newJobStatus.isGloballyTerminalState()) {
+   // other terminal job states are handled by the 
executions
+   if (newJobStatus == JobStatus.FINISHED) {
+   runAsync(() -> {
+   for (Map.Entry> entry : 
registeredTaskManagers.entrySet()) {
+   Collection 
storedPartitions = partitionTable.stopTrackingPartitions(entry.getKey());
+   // if this call fails 
TaskExecutors will cleanup partitions regardless once we close the connections
+   
entry.getValue().f1.releasePartitions(jobGraph.getJobID(), storedPartitions);
 
 Review comment:
   ah yes, I got stuck here since the SM requires a ShuffleDescriptor, but so 
far we've only been tracking ResultPartitionIDs.
   I have updated the PR to include a PartitionTracker which keeps track of 
shuffle descriptor, result partition ids etc. and acts as a central point for 
issuing release calls.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski closed pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder

2019-06-26 Thread GitBox
pnowojski closed pull request #8199: [FLINK-11955] Modify build to move 
filesystems from lib to plugins folder
URL: https://github.com/apache/flink/pull/8199
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on issue #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder

2019-06-26 Thread GitBox
pnowojski commented on issue #8199: [FLINK-11955] Modify build to move 
filesystems from lib to plugins folder
URL: https://github.com/apache/flink/pull/8199#issuecomment-505795217
 
 
   > I think there is a value in relocating the fs plugins done together with 
un-shading them (FLINK-11956). Because in that case, the jars would be not 
backwards compatible and should be loaded only via plugin manager. This way we 
can "force" Flink users to properly adjust their Flink setups. But in that 
case, I'd propose to consider keeping them under some opt/ subdir.
   
   This might be still problematic, since it would force all deployments to  
"load" all file system plugins at the startup, regardless whether they are 
desired or not.
   
   I think this ticket might be invalid. When we were creating it with 
@StefanRRichter, we didn't expect 
https://issues.apache.org/jira/browse/FLINK-12143 to basically subsume this. We 
thought that currently file system jars are being put into the `lib` directory 
by default by the build system. Since users are expected to move/copy the jars 
from `opt` to `lib` or `plugins` manually, it might well be that we don't need 
this (FLINK-11955) PR.
   
   I'm closing this PR & Jira ticket for now. We can continue discussing in the 
Jira ticket and we can always re-open both PR and/or Jira ticket if we decide 
otherwise.
   
   Sorry @yanghua for the confusion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime

2019-06-26 Thread GitBox
liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] 
Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
URL: https://github.com/apache/flink/pull/8757#discussion_r297571612
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDate;
+
+/**
+ * This class can not extend {@link BasicTypeComparator}, because LocalDate is 
a
+ * Comparable of ChronoLocalDate instead of Comparable of LocalDate.
+ */
+@Internal
+public final class LocalDateComparator extends TypeComparator 
implements Serializable {
+
+   private transient LocalDate reference;
+
+   protected final boolean ascendingComparison;
+
+   // For use by getComparators
+   @SuppressWarnings("rawtypes")
+   private final LocalDateComparator[] comparators = new 
LocalDateComparator[] {this};
+
+   public LocalDateComparator(boolean ascending) {
+   this.ascendingComparison = ascending;
+   }
+
+   @Override
+   public int hash(LocalDate value) {
+   return value.hashCode();
+   }
+
+   @Override
+   public void setReference(LocalDate toCompare) {
+   this.reference = toCompare;
+   }
+
+   @Override
+   public boolean equalToReference(LocalDate candidate) {
+   return candidate.equals(reference);
+   }
+
+   @Override
+   public int compareToReference(TypeComparator 
referencedComparator) {
+   int comp = ((LocalDateComparator) 
referencedComparator).reference.compareTo(reference);
+   return ascendingComparison ? comp : -comp;
+   }
+
+   @Override
+   public int compare(LocalDate first, LocalDate second) {
+   int cmp = first.compareTo(second);
+   return ascendingComparison ? cmp : -cmp;
+   }
+
+   @Override
+   public boolean invertNormalizedKey() {
+   return !ascendingComparison;
+   }
+
+   @Override
+   public boolean supportsSerializationWithKeyNormalization() {
+   return false;
+   }
+
+   @Override
+   public void writeWithKeyNormalization(LocalDate record, DataOutputView 
target) throws IOException {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public int extractKeys(Object record, Object[] target, int index) {
+   target[index] = record;
+   return 1;
+   }
+
+   @SuppressWarnings("rawtypes")
+   @Override
+   public TypeComparator[] getFlatComparators() {
+   return comparators;
+   }
+
+   @Override
+   public LocalDate readWithKeyDenormalization(LocalDate reuse, 
DataInputView source) throws IOException {
+   throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+   return compareSerializedLocalDate(firstSource, secondSource, 
ascendingComparison);
+   }
+
+   @Override
+   public boolean supportsNormalizedKey() {
+   return true;
+   }
+
+   @Override
+   public int getNormalizeKeyLen() {
+   return 6;
+   }
+
+   @Override
+   public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+   return keyBytes < getNormalizeKeyLen();
+   }
+
+   @Override
+   public void putNormalizedKey(LocalDate record, MemorySegment target, 
int offset, int numBytes) {
+   putNormalizedKeyLocalDate(record, target, offset, numBytes);
+   }
+
+   @Override
+   public LocalDateComparator duplicate() {
+   

[jira] [Closed] (FLINK-12612) Track stored partitions on the TM

2019-06-26 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-12612.

Resolution: Fixed

master: 847e7e79387b0347015733fdfacb0595e00bcda0

> Track stored partitions on the TM
> -
>
> Key: FLINK-12612
> URL: https://issues.apache.org/jira/browse/FLINK-12612
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Network
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The TaskExecutor has to track which partitions it has stored, grouped by 
> JobID.
> This is necessary so that the TaskExecutor only disconnects from the 
> JobManager if it no longer stores relevant (i.e., belonging to the 
> corresponding job) partitions (similar to the taskSlotTable).
> Additionally this is needed so that the TaskExecutor can independently 
> cleanup partitions for a given job if the connection to the corresponding 
> JobManager is lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dianfu commented on issue #8715: [FLINK-12821][table][cep] Fix the bug that fix time quantifier can not be the last element of a pattern

2019-06-26 Thread GitBox
dianfu commented on issue #8715: [FLINK-12821][table][cep] Fix the bug that fix 
time quantifier can not be the last element of a pattern
URL: https://github.com/apache/flink/pull/8715#issuecomment-505799482
 
 
   @dawidwys thanks a lot for the review and commit. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua closed pull request #8684: [FLINK-12790] Implement KeyScope enum to distinguish the key of localKeyBy and general keyBy API

2019-06-26 Thread GitBox
yanghua closed pull request #8684: [FLINK-12790] Implement KeyScope enum to 
distinguish the key of localKeyBy and general keyBy API
URL: https://github.com/apache/flink/pull/8684
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8892: [FLINK-FLINK-12990][python] Date type doesn't consider the local TimeZone

2019-06-26 Thread GitBox
dianfu commented on issue #8892: [FLINK-FLINK-12990][python] Date type doesn't 
consider the local TimeZone
URL: https://github.com/apache/flink/pull/8892#issuecomment-505802703
 
 
   @sunjincheng121 Makes sense and I will add some test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #8684: [FLINK-12790] Implement KeyScope enum to distinguish the key of localKeyBy and general keyBy API

2019-06-26 Thread GitBox
yanghua commented on issue #8684: [FLINK-12790] Implement KeyScope enum to 
distinguish the key of localKeyBy and general keyBy API
URL: https://github.com/apache/flink/pull/8684#issuecomment-505802673
 
 
   @pnowojski I opened this PR before @aljoscha ask me to create FLIP. So I 
also think it would be better to close it first. In addition, can you continue 
to join the discussion to give your opinion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12960) Introduce ShuffleDescriptor#ReleaseType

2019-06-26 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin updated FLINK-12960:

Summary: Introduce ShuffleDescriptor#ReleaseType  (was: Move 
ResultPartitionDeploymentDescriptor#releasedOnConsumption to 
PartitionDescriptor#releasedOnConsumption)

> Introduce ShuffleDescriptor#ReleaseType
> ---
>
> Key: FLINK-12960
> URL: https://issues.apache.org/jira/browse/FLINK-12960
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> {{ResultPartitionDeploymentDescriptor#releasedOnConsumption}} shows the 
> intention how the partition is going to be used by the shuffle user and 
> released. The {{ShuffleDescriptor}} should provide a way to query which 
> release type is supported by shuffle service for this partition. If the 
> requested release type is not supported by the shuffle service for a certain 
> type of partition, the job should fail fast.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12960) Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#releasedOnConsumption

2019-06-26 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin updated FLINK-12960:

Description: {{ResultPartitionDeploymentDescriptor#releasedOnConsumption}} 
shows the intention how the partition is going to be used by the shuffle user 
and released. The {{ShuffleDescriptor}} should provide a way to query which 
release type is supported by shuffle service for this partition. If the 
requested release type is not supported by the shuffle service for a certain 
type of partition, the job should fail fast.  (was: 
ResultPartitionDeploymentDescriptor#releasedOnConsumption shows the intention 
how the partition is going to be used by the shuffle user. If it is not 
supported by the shuffle service for a certain type of partition, 
ShuffleMaster#registerPartitionWithProducer and 
ShuffleEnvironment#createResultPartitionWriters should throw an exception. 
ShuffleMaster#registerPartitionWithProducer takes PartitionDescriptor. 
ResultPartitionDeploymentDescriptor#releasedOnConsumption should be part of 
PartitionDescriptor so that not only ShuffleEnvironment but also ShuffleMaster 
is already aware about releasedOnConsumption.)

> Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to 
> PartitionDescriptor#releasedOnConsumption
> ---
>
> Key: FLINK-12960
> URL: https://issues.apache.org/jira/browse/FLINK-12960
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> {{ResultPartitionDeploymentDescriptor#releasedOnConsumption}} shows the 
> intention how the partition is going to be used by the shuffle user and 
> released. The {{ShuffleDescriptor}} should provide a way to query which 
> release type is supported by shuffle service for this partition. If the 
> requested release type is not supported by the shuffle service for a certain 
> type of partition, the job should fail fast.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] Add elaborated partition release logic

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8804: 
[FLINK-12883][WIP][runtime] Add elaborated partition release logic
URL: https://github.com/apache/flink/pull/8804#discussion_r297592199
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1630,6 +1627,79 @@ public boolean updateState(TaskExecutionState state) {
}
}
 
+   private boolean updateStateInternal(final TaskExecutionState state, 
final Execution attempt) {
+   Map> accumulators;
+
+   switch (state.getExecutionState()) {
+   case RUNNING:
+   return attempt.switchToRunning();
+
+   case FINISHED:
+   // this deserialization is exception-free
+   accumulators = deserializeAccumulators(state);
+   attempt.markFinished(accumulators, 
state.getIOMetrics());
+   return true;
+
+   case CANCELED:
+   // this deserialization is exception-free
+   accumulators = deserializeAccumulators(state);
+   attempt.completeCancelling(accumulators, 
state.getIOMetrics());
+   return true;
+
+   case FAILED:
+   // this deserialization is exception-free
+   accumulators = deserializeAccumulators(state);
+   
attempt.markFailed(state.getError(userClassLoader), accumulators, 
state.getIOMetrics());
+   return true;
+
+   default:
+   // we mark as failed and return false, which 
triggers the TaskManager
+   // to remove the task
+   attempt.fail(new Exception("TaskManager sent 
illegal state update: " + state.getExecutionState()));
+   return false;
+   }
+   }
+
+   private void maybeReleasePartitions(final TaskExecutionState state, 
final Execution attempt) {
+   final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
+
+   if (state.getExecutionState() == ExecutionState.FINISHED) {
+   final List 
releasablePartitions = 
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
+   releasePartitions(releasablePartitions);
 
 Review comment:
   We might only need `SchedulingTopology` which was already used in below 
`createResultPartitionId` method.  It might be like this in 
`createResultPartitionId`.
   
   ```
   SchedulingResultPartition schedulingResultPartition = 
schedulingTopology.getResultPartitionOrThrow(resultPartition);
   ResultPartitionID resultPartitonId = 
schedulingResultPartition.getResultPartitionId();
   ```
   
   If we add the `ResultPartitionID` info in the constructor of 
`DefaultSchedulingResultPartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11956) Remove shading from filesystems build

2019-06-26 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873244#comment-16873244
 ] 

Chesnay Schepler commented on FLINK-11956:
--

Are all of our filesystems compatible with the plugin mechanism? Can they still 
be loaded without the plugin mechanism?

> Remove shading from filesystems build
> -
>
> Key: FLINK-11956
> URL: https://issues.apache.org/jira/browse/FLINK-11956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Stefan Richter
>Assignee: vinoyang
>Priority: Blocker
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11956) Remove shading from filesystems build

2019-06-26 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873247#comment-16873247
 ] 

Piotr Nowojski edited comment on FLINK-11956 at 6/26/19 11:40 AM:
--

We haven't touched any existing {{FileSystem}} implementation, so yes, they 
should be loadable without the plugin mechanism. It also should work in the 
opposite direction.

As long as:
 # there are no un-declared implicit dependencies that were leaking from the 
user class loader
 # {{FileSystemFactory}} plugin either do not use 
{{Thread.currentThread().getContextClassLoader()}} or use it only during 
{{FileSystem}} creation (\{{org.apache.flink.core.fs.FileSystemFactory#create}})

existing file systems should work just as well with plugins. In other words, 
plugins vs non-plugins {{FileSystem}} they differ only on the packaging & 
dependencies leaking level, there are no API changes. One caveat here is this 
{{getContextClassLoader}} issue. Badly written {{FileSystem}} can be inherently 
incompatible with plugins.

This will be no longer true, once we remove shading. Non shaded {{FileSystems}} 
should work with plugins by design, but opposite might be no longer true 
(because of the dependencies clashes). This is why we can not resolve this 
ticket for 1.9, as this will brake the backwards compatibility.


was (Author: pnowojski):
We haven't touched any existing {{FileSystem}} implementation, so yes, they 
should be loadable without the plugin mechanism (we have some end to end tests 
that are using existing non shaded S3 {{FileSystem}}  as plugins). It also 
should work in the opposite direction. As long as there are not un declared 
implicit dependencies that were leaking from the user class loader, existing 
file systems should work just as well with plugins. In other words, plugins vs 
non-plugins {{FileSystem}} they differ only on the packaging & dependencies 
leaking level, there are no API changes.

This will be no longer true, once we remove shading. Non shaded 
\{{FileSystems}} should work with plugins by design, but opposite might be no 
longer true (because of the dependencies clashes). This is why we can not 
resolve this ticket for 1.9, as this will brake the backwards compatibility.

> Remove shading from filesystems build
> -
>
> Key: FLINK-11956
> URL: https://issues.apache.org/jira/browse/FLINK-11956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Stefan Richter
>Assignee: vinoyang
>Priority: Blocker
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13001) Add ExecutionGraphBuilder for testing

2019-06-26 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-13001:


 Summary: Add ExecutionGraphBuilder for testing
 Key: FLINK-13001
 URL: https://issues.apache.org/jira/browse/FLINK-13001
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Tests
Affects Versions: 1.9.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.9.0


Creating an ExecutionGraph for testing is quite a hassle since we have no 
builder for doing so easily.
Quite a few classes contain utility methods accepting a subset of the required 
arguments, which are painful to extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8607: [FLINK-12652] 
[documentation] add first version of a glossary
URL: https://github.com/apache/flink/pull/8607#discussion_r297633905
 
 

 ##
 File path: docs/concepts/glossary.md
 ##
 @@ -0,0 +1,168 @@
+---
+title: Glossary
+nav-pos: 3
+nav-title: Glossary
+nav-parent_id: concepts
+---
+
+
+ Flink Application Cluster
+
+A Flink Application Cluster is a dedicated [Flink 
Cluster](./glossary#flink-cluster) that only
+executes a single [Flink Job](./glossary#flink-job). The lifetime of the
+[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the 
Flink Job. Formerly
+Flink Application Clusters were also known as Flink Clusters in *job mode*. 
Compare to
+[Flink Session Cluster](./glossary#flink-session-cluster).
+
+ Flink Cluster
+
+The distributed system consisting of (typically) one Flink Master process and 
one or more Flink
+Taskmanagers processes.
+
+ Event
+
+An event is a statement about a change of the state of the domain modelled by 
the
+application. Events can be input and/or output of a stream or batch processing 
application.
+Events are special types of [records](./glossary#Record).
+
+ ExecutionGraph
+
+see [Physical Graph](./glossary#physical-graph)
+
+ Function
+
+Functions are implemented by the user and encapsulate the
+application logic of a Flink program. Most Functions are wrapped by a 
corresponding
+[Operator](./glossary#operator).
+
+ Instance
+
+The term *instance* is used to describe a specific instance of a specific type 
(usually
+[Operator](./glossary#operator) or [Function](./glossary#function)) during 
runtime. As Apache Flink
+is mostly written in Java, this corresponds to the definition of *Instance* or 
*Object* in Java.
+In the context of Apache Flink, the term *parallel instance* is also 
frequently used to emphasize
+that multiple instances of the same [Operator](./glossary#operator) or
+[Function](./glossary#function) type are running in parallel.
+
+ Flink Job
+
+A Flink Job is the runtime representation of a Flink program. A Flink Job can 
either be submitted
+to a long running [Flink Session Cluster](./glossary#flink-session-cluster) or 
it can be started as a
+self-contained [Flink Application 
Cluster](./glossary#flink-application-cluster).
+
+ JobGraph
+
+see [Logical Graph](./glossary#logical-graph)
+
+ Flink JobManager
+
+JobManagers are one of the components running in the
+[Flink JobManger Process](./glossary#flink-jobmanager-process). A JobManager 
is responsible for
+supervising the execution of the [Tasks](./glossary#task) of a single job.
+
+ Logical Graph
+
+A logical graph is a directed graph describing the high-level logic of a 
stream processing program.
+The nodes are [Operators](./glossary#operator) and the edges indicate 
input/output-relationships or
+data streams or data sets.
+
+ Managed State
+
+Managed State describes application state which has been registered with the 
framework. For
+Managed State, Apache Flink will take care about persistence and rescaling 
among other things.
+
+ Flink JobManager Process
+
+The JobManager Process is the master of a [Flink 
Cluster](./glossary#flink-cluster). It is called
+*JobManager* for historical reasons, but actually contains three distinct 
components:
+Flink Resource Manager, Flink Dispatcher and one [Flink 
JobManager](./glossary#flink-jobmanager)
+per running [Flink Job](./glossary#flink-job).
+
+ Operator
+
+Node of a [Logical Graph](./glossary#logical-graph). An Operator performs a 
certain operation,
+which is usually executed by a [Function](./glossary#function). Sources and 
Sinks are special
+Operators for data ingestion and data egress.
+
+ Operator Chain
+
+An Operator Chain consists of two or more consecutive 
[Operators](./glossary#operator) without any
+repartitioning in between. Operators within the same Operation Chain forward 
records to each other
+directly without going through serialization or Flink's network stack.
+
+ Partition
+
+A partition is an independent subset of the overall data stream or data set. A 
data stream or
+data set is divided into partitions by assigning each 
[record](./glossary#Record) to one or more
+partitions. Partitions of data streams or data sets are consumed by 
[Tasks](./glossary#task) during
+runtime. A transformation which changes the way a data stream or data set is 
partitioned is often
+called repartitioning.
+
+ Physical Graph
+
+A physical graph is the result of translating a [Logical 
Graph](./glossary#logical-graph) for
+execution in a distributed runtime. The nodes are [Tasks](./glossary#task) and 
the edges indicate
+input/output-relationships or [partitions](./glossary#partition) of data 
streams or data sets.
+
+ Record
+
+Records are the constituent elements of a data set or data stream.
+[Operators](./glossary#operator) and [Functions](./glossary#Function) receive 
records as input

[jira] [Commented] (FLINK-10879) Align Flink clients on env.execute()

2019-06-26 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873283#comment-16873283
 ] 

TisonKun commented on FLINK-10879:
--

[~f.pompermaier] I suspect if you deploy a job cluster ant code after 
{{env.execute}} would not be executed, too.

I'm glad to learn what "REST API" exactly means because in my mind there are 3 
approaches for job submission

1. Directly run in IDE
2. Submit via CliFrontend
3. Programming to get ClusterClient and submit JobGraph

> Align Flink clients on env.execute()
> 
>
> Key: FLINK-10879
> URL: https://issues.apache.org/jira/browse/FLINK-10879
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now the REST APIs do not support any code after env.execute while the 
> Flink API, CLI client or the code executed within the IDE do.
> Both clients should behave in the same way (supporting env.execute() to 
> return something and continue the code execution or not).
> See the discussion on the DEV ML for more details: 
> http://mail-archives.apache.org/mod_mbox/flink-dev/201811.mbox/%3CCAELUF_DhjzL9FECvx040_GE3d85Ykb-HcGVCh0O4y9h-cThq7A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10879) Align Flink clients on env.execute()

2019-06-26 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873295#comment-16873295
 ] 

TisonKun commented on FLINK-10879:
--

Well I can see the "REST API" ways now. It compiles the job via 
{{OptimizedPlanEnvironment}} which aborted once execute {{#execute}} and thus 
any code after {{env.execute}} would not be executed.

> Align Flink clients on env.execute()
> 
>
> Key: FLINK-10879
> URL: https://issues.apache.org/jira/browse/FLINK-10879
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>
> Right now the REST APIs do not support any code after env.execute while the 
> Flink API, CLI client or the code executed within the IDE do.
> Both clients should behave in the same way (supporting env.execute() to 
> return something and continue the code execution or not).
> See the discussion on the DEV ML for more details: 
> http://mail-archives.apache.org/mod_mbox/flink-dev/201811.mbox/%3CCAELUF_DhjzL9FECvx040_GE3d85Ykb-HcGVCh0O4y9h-cThq7A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12023) Override TaskManager memory when submitting a Flink job

2019-06-26 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-12023:


Assignee: (was: vinoyang)

> Override TaskManager memory when submitting a Flink job
> ---
>
> Key: FLINK-12023
> URL: https://issues.apache.org/jira/browse/FLINK-12023
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Configuration
>Reporter: Michel Davit
>Priority: Minor
>
> Currently a Flink session can only run Task managers of the same size.
> However, depending on the jar or even the program arguments, we can have more 
> intensive job that other.
> In order to improve memory usage and avoid resource waste, It would be useful 
> to have an option that overrides the default task manager memory setting when 
> submitting a new job.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297531579
 
 

 ##
 File path: flink-python/pyflink/find_flink_home.py
 ##
 @@ -15,12 +16,21 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 

-
+import glob
 import logging
 import os
 import sys
 
 
+def is_flink_home(path):
 
 Review comment:
   is_flink_home -> _is_flink_home


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol opened a new pull request #8896: [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to JM concept

2019-06-26 Thread GitBox
zentol opened a new pull request #8896: [FLINK-12993][runtime] Refactor 
forceReleaseOnConsumption to JM concept
URL: https://github.com/apache/flink/pull/8896
 
 
   The `forceReleaseOnConsumption` flag allows us to switch between the current 
behavior (all partitions are released on consumption) and the planned new 
behavior (only pipelined partitions are released on consumption).
   
   This PR moves the processing of this flag from the TM side (where it was 
handled in the netty shuffle environment) to the JM side (now handled in the 
Execution when creating the ResultPartitionDeploymentDescriptor).
   
   The corresponding config option is evaluated by the `ExecutionGraphBuilder`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297546146
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/pyflink-shell.sh
 ##
 @@ -18,8 +18,13 @@
 

 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
+. "$bin"/find-flink-home.sh
 
-. "$bin"/config.sh
+_FLINK_HOME_DETERMINED=1
+
+cd "$FLINK_HOME"/bin
 
 Review comment:
   Seems that this line can be removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297526228
 
 

 ##
 File path: flink-python/pyflink/shell.py
 ##
 @@ -15,7 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 

+import codecs
+import io
 
 Review comment:
   io is not used


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297540088
 
 

 ##
 File path: flink-python/pyflink/find_flink_home.py
 ##
 @@ -1,4 +1,5 @@
-
+#!/usr/bin/env python
 
 Review comment:
   Good catch. What about also adding this header for shell.py?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297532944
 
 

 ##
 File path: flink-python/pyflink/find_flink_home.py
 ##
 @@ -30,12 +40,31 @@ def _find_flink_home():
 return os.environ['FLINK_HOME']
 else:
 try:
-flink_root_dir = 
os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
+current_dir = 
os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
+flink_root_dir = os.path.abspath(current_dir + "/../../")
 build_target = flink_root_dir + "/build-target"
-pyflink_file = build_target + "/bin/pyflink-gateway-server.sh"
-if os.path.isfile(pyflink_file):
+if is_flink_home(build_target):
 os.environ['FLINK_HOME'] = build_target
 return build_target
+
+if sys.version < "3":
+import imp
+try:
+module_home = imp.find_module("pyflink")[1]
+if is_flink_home(module_home):
+os.environ['FLINK_HOME'] = module_home
+return module_home
+except ImportError:
+pass
+else:
+from importlib.util import find_spec
+try:
+module_home = os.path.dirname(find_spec("pyflink").origin)
 
 Review comment:
   Most of the code in the if/else is duplicate. We can put only the 
module_name statement in the if/else to eliminate duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297543943
 
 

 ##
 File path: flink-python/MANIFEST.in
 ##
 @@ -0,0 +1,29 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+global-exclude *.py[cod] __pycache__ .DS_Store
+recursive-include deps/lib *.jar
 
 Review comment:
   What about change these two lines to the following?
   graft deps/lib *.jar
   recursive-include deps/opt


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297519065
 
 

 ##
 File path: flink-python/setup.py
 ##
 @@ -42,31 +44,165 @@
 with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') 
as f:
 long_description = f.read()
 
-setup(
-name='pyflink',
-version=VERSION,
-packages=['pyflink',
-  'pyflink.table',
-  'pyflink.util',
-  'pyflink.datastream',
-  'pyflink.dataset',
-  'pyflink.common'],
-url='http://flink.apache.org',
-license='http://www.apache.org/licenses/LICENSE-2.0',
-author='Flink Developers',
-author_email='d...@flink.apache.org',
-install_requires=['py4j==0.10.8.1'],
-tests_require=['pytest==4.4.1'],
-description='Apache Flink Python API',
-long_description=long_description,
-long_description_content_type='text/markdown',
-classifiers=[
-'Development Status :: 1 - Planning',
-'License :: OSI Approved :: Apache Software License',
-'Programming Language :: Python :: 2.7',
-'Programming Language :: Python :: 3.3',
-'Programming Language :: Python :: 3.4',
-'Programming Language :: Python :: 3.5',
-'Programming Language :: Python :: 3.6',
-'Programming Language :: Python :: 3.7']
-)
+TEMP_PATH = "deps"
+
+LIB_TEMP_PATH = os.path.join(TEMP_PATH, "lib")
+OPT_TEMP_PATH = os.path.join(TEMP_PATH, "opt")
+CONF_TEMP_PATH = os.path.join(TEMP_PATH, "conf")
+EXAMPLES_TEMP_PATH = os.path.join(TEMP_PATH, "examples")
+LICENSES_TEMP_PATH = os.path.join(TEMP_PATH, "licenses")
+SCRIPTS_TEMP_PATH = os.path.join(TEMP_PATH, "bin")
+
+LICENSE_FILE_TEMP_PATH = os.path.join("pyflink", "LICENSE")
+NOTICE_FILE_TEMP_PATH = os.path.join("pyflink", "NOTICE")
+README_FILE_TEMP_PATH = os.path.join("pyflink", "README.txt")
+
+in_flink_source = 
os.path.isfile("../flink-java/src/main/java/org/apache/flink/api/java/"
+ "ExecutionEnvironment.java")
+
+try:
+if in_flink_source:
+
+try:
+os.mkdir(TEMP_PATH)
+except:
+print("Temp path for symlink to parent already exists 
{0}".format(TEMP_PATH),
+  file=sys.stderr)
+sys.exit(-1)
+
+FLINK_HOME = os.path.abspath("../build-target")
+
+incorrect_invocation_message = """
+If you are installing pyflink from flink source, you must first build Flink and
+run sdist.
+
+To build Flink with maven you can run:
+  mvn -DskipTests clean package
+Building the source dist is done in the flink-python directory:
+  cd flink-python
+  python setup.py sdist
+  pip install dist/*.tar.gz"""
+
+LIB_PATH = os.path.join(FLINK_HOME, "lib")
+OPT_PATH = os.path.join(FLINK_HOME, "opt")
+CONF_PATH = os.path.join(FLINK_HOME, "conf")
+EXAMPLES_PATH = os.path.join(FLINK_HOME, "examples")
+LICENSES_PATH = os.path.join(FLINK_HOME, "licenses")
+SCRIPTS_PATH = os.path.join(FLINK_HOME, "bin")
+
+LICENSE_FILE_PATH = os.path.join(FLINK_HOME, "LICENSE")
+NOTICE_FILE_PATH = os.path.join(FLINK_HOME, "NOTICE")
+README_FILE_PATH = os.path.join(FLINK_HOME, "README.txt")
+
+if not os.path.isdir(LIB_PATH):
+print(incorrect_invocation_message, file=sys.stderr)
+sys.exit(-1)
+
+if getattr(os, "symlink", None) is not None:
+os.symlink(LIB_PATH, LIB_TEMP_PATH)
+os.symlink(OPT_PATH, OPT_TEMP_PATH)
+os.symlink(CONF_PATH, CONF_TEMP_PATH)
+os.symlink(EXAMPLES_PATH, EXAMPLES_TEMP_PATH)
+os.symlink(LICENSES_PATH, LICENSES_TEMP_PATH)
+os.symlink(SCRIPTS_PATH, SCRIPTS_TEMP_PATH)
+os.symlink(LICENSE_FILE_PATH, LICENSE_FILE_TEMP_PATH)
+os.symlink(NOTICE_FILE_PATH, NOTICE_FILE_TEMP_PATH)
+os.symlink(README_FILE_PATH, README_FILE_TEMP_PATH)
+else:
+copytree(LIB_PATH, LIB_TEMP_PATH)
+copytree(OPT_PATH, OPT_TEMP_PATH)
+copytree(CONF_PATH, CONF_TEMP_PATH)
+copytree(EXAMPLES_PATH, EXAMPLES_TEMP_PATH)
+copytree(LICENSES_PATH, LICENSES_TEMP_PATH)
+copytree(SCRIPTS_PATH, SCRIPTS_TEMP_PATH)
+copy(LICENSE_FILE_PATH, LICENSE_FILE_TEMP_PATH)
+copy(NOTICE_FILE_PATH, NOTICE_FILE_TEMP_PATH)
+copy(README_FILE_PATH, README_FILE_TEMP_PATH)
+else:
+if not os.path.isdir(LIB_TEMP_PATH) or not 
os.path.isdir(OPT_TEMP_PATH) \
+or not os.path.isdir(SCRIPTS_TEMP_PATH):
+print("The flink core files are not found. Please make sure your 
installation package "
+  "is complete, or do this in the flink-python directory of 
the flink source "
+  "directory.")
+sys.exit(-1)
+
+

[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297460497
 
 

 ##
 File path: flink-python/setup.py
 ##
 @@ -42,31 +44,165 @@
 with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') 
as f:
 long_description = f.read()
 
-setup(
-name='pyflink',
-version=VERSION,
-packages=['pyflink',
-  'pyflink.table',
-  'pyflink.util',
-  'pyflink.datastream',
-  'pyflink.dataset',
-  'pyflink.common'],
-url='http://flink.apache.org',
-license='http://www.apache.org/licenses/LICENSE-2.0',
-author='Flink Developers',
-author_email='d...@flink.apache.org',
-install_requires=['py4j==0.10.8.1'],
-tests_require=['pytest==4.4.1'],
-description='Apache Flink Python API',
-long_description=long_description,
-long_description_content_type='text/markdown',
-classifiers=[
-'Development Status :: 1 - Planning',
-'License :: OSI Approved :: Apache Software License',
-'Programming Language :: Python :: 2.7',
-'Programming Language :: Python :: 3.3',
-'Programming Language :: Python :: 3.4',
-'Programming Language :: Python :: 3.5',
-'Programming Language :: Python :: 3.6',
-'Programming Language :: Python :: 3.7']
-)
+TEMP_PATH = "deps"
+
+LIB_TEMP_PATH = os.path.join(TEMP_PATH, "lib")
+OPT_TEMP_PATH = os.path.join(TEMP_PATH, "opt")
+CONF_TEMP_PATH = os.path.join(TEMP_PATH, "conf")
+EXAMPLES_TEMP_PATH = os.path.join(TEMP_PATH, "examples")
+LICENSES_TEMP_PATH = os.path.join(TEMP_PATH, "licenses")
+SCRIPTS_TEMP_PATH = os.path.join(TEMP_PATH, "bin")
 
 Review comment:
   Should we also consider the directory `plugins` in build-target?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
dianfu commented on a change in pull request #8863: [FLINK-12962][python] 
Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297550056
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/config.sh
 ##
 @@ -296,7 +296,10 @@ bin=`dirname "$target"`
 SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
 
 # Define the main directory of the flink installation
-FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
+# If config.sh is called by pyflink-shell.sh in python bin directory(pip 
installed), use the _PYFLINK_HOME
+if [ -z "$_FLINK_HOME_DETERMINED" ]; then
 
 Review comment:
   In case that `FLINK_HOME` is already determined, `FLINK_HOME` will be set. 
So we can check if `FLINK_HOME` is already set and there is no need to add 
variable `_FLINK_HOME_DETERMINED`. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12953) View logs from Job view in Web Dashboard

2019-06-26 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873121#comment-16873121
 ] 

Robert Metzger commented on FLINK-12953:


[~chadrik] thanks a lot for opening this ticket!

I agree with you that it would be a great improvement to Flink's UX.
Does Cloud Dataflow only show custom log messages (from the user code), or also 
system log messages from a transformation?
If it is only custom log messages: are they providing a logger instance that 
the user has to use?

> View logs from Job view in Web Dashboard
> 
>
> Key: FLINK-12953
> URL: https://issues.apache.org/jira/browse/FLINK-12953
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Chad Dombrova
>Priority: Major
>
> As a (beam) developer I want to be able to print/log information from my 
> custom transforms, and then monitor that output within the job view of the 
> Web Dashboard, so that I don't have to go hunting through the combined log in 
> the Job Manager view.   The Job Manager log has way too much in it, spanning 
> all jobs, including output logged by both flink and user code.
> A good example of how this UX should work can be found in Google Dataflow:  
> - click on a job, and see the logged output for that job
> - click on a transform, and see the logged output for just that transform
> thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12786) Implement local aggregation in Flink

2019-06-26 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873162#comment-16873162
 ] 

vinoyang commented on FLINK-12786:
--

Now, the main discussers in the community are me and the developers from 
Alibaba. It seems that we have major differences at the API level. We hope that 
[~aljoscha] and [~StephanEwen] can give more professional advice. Maybe just 
give directions. It seems that the current discussion will be inefficient. This 
wastes a lot of time. Maybe a lot of work could have been done in parallel.

> Implement local aggregation in Flink
> 
>
> Key: FLINK-12786
> URL: https://issues.apache.org/jira/browse/FLINK-12786
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, keyed streams are widely used to perform aggregating operations 
> (e.g., reduce, sum and window) on the elements that have the same key. When 
> executed at runtime, the elements with the same key will be sent to and 
> aggregated by the same task.
>  
> The performance of these aggregating operations is very sensitive to the 
> distribution of keys. In the cases where the distribution of keys follows a 
> powerful law, the performance will be significantly downgraded. More 
> unluckily, increasing the degree of parallelism does not help when a task is 
> overloaded by a single key.
>  
> Local aggregation is a widely-adopted method to reduce the performance 
> degraded by data skew. We can decompose the aggregating operations into two 
> phases. In the first phase, we aggregate the elements of the same key at the 
> sender side to obtain partial results. Then at the second phase, these 
> partial results are sent to receivers according to their keys and are 
> combined to obtain the final result. Since the number of partial results 
> received by each receiver is limited by the number of senders, the imbalance 
> among receivers can be reduced. Besides, by reducing the amount of 
> transferred data the performance can be further improved.
> The design documentation is here: 
> [https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing]
> The discussion thread is here: 
> [http://mail-archives.apache.org/mod_mbox/flink-dev/201906.mbox/%3CCAA_=o7dvtv8zjcxknxyoyy7y_ktvgexrvb4zhxjwzuhsulz...@mail.gmail.com%3E]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-06-26 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r297579509
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -82,7 +89,10 @@ public void open(FileSystem fs, Path path) throws 
IOException {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
-   outputStream.write('\n');
+   if (rowDelimiterBytes == null) {
+   rowDelimiterBytes = rowDelimiter.getBytes(charset);
 
 Review comment:
   Why are you "caching" `rowDelimiterBytes`? Isn't this a premature 
optimisation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol merged pull request #8857: [FLINK-12960] Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes

2019-06-26 Thread GitBox
zentol merged pull request #8857: [FLINK-12960] Introduce 
ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes
URL: https://github.com/apache/flink/pull/8857
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8646: [FLINK-12735][network] 
Make shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r297581901
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 ##
 @@ -71,7 +71,7 @@ public void beforeTest() {
}
 
@After
-   public void afterTest() {
+   public void afterTest() throws Exception {
this.ioManager.close();
if (!this.ioManager.isProperlyShutDown()) {
 
 Review comment:
   we could add try catch and ignore the exception here. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-06-26 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r297581797
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
 ##
 @@ -32,7 +32,7 @@
 
@Test
public void testDuplicate() {
 
 Review comment:
   Please also add a unit test (in this file?) to cover that the custom 
delimiter is actually used somehow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-06-26 Thread GitBox
pnowojski commented on a change in pull request #8621: 
[FLINK-12682][connectors] StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8621#discussion_r297580610
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -54,13 +59,15 @@ public StringWriter() {
 *
 * @param charsetName Name of the charset to be used, must be valid 
input for {@code Charset.forName(charsetName)}
 */
-   public StringWriter(String charsetName) {
+   public StringWriter(String charsetName, String rowDelimiter) {
 
 Review comment:
   please update the `@param` java doc.
   
   Secondly, I'm not sure, but isn't `StringWriter` a part of public API and 
shouldn't we preserve the old constructor?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8607: [FLINK-12652] 
[documentation] add first version of a glossary
URL: https://github.com/apache/flink/pull/8607#discussion_r297636397
 
 

 ##
 File path: docs/concepts/glossary.md
 ##
 @@ -0,0 +1,168 @@
+---
+title: Glossary
+nav-pos: 3
+nav-title: Glossary
+nav-parent_id: concepts
+---
+
+
+ Flink Application Cluster
+
+A Flink Application Cluster is a dedicated [Flink 
Cluster](./glossary#flink-cluster) that only
+executes a single [Flink Job](./glossary#flink-job). The lifetime of the
+[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the 
Flink Job. Formerly
+Flink Application Clusters were also known as Flink Clusters in *job mode*. 
Compare to
+[Flink Session Cluster](./glossary#flink-session-cluster).
+
+ Flink Cluster
+
+The distributed system consisting of (typically) one Flink Master process and 
one or more Flink
+Taskmanagers processes.
+
+ Event
+
+An event is a statement about a change of the state of the domain modelled by 
the
+application. Events can be input and/or output of a stream or batch processing 
application.
+Events are special types of [records](./glossary#Record).
 
 Review comment:
   In Domain Driven Design, Event-Driven Architectures, etc. this is exactly, 
what an event is. It is always an event (=state change) in the domain. I could 
add a text book example like "checkout" or "item added to cart". Would this 
help?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leesf opened a new pull request #8899: [hotfix][readme] fix how to contribute to Apache Flink url

2019-06-26 Thread GitBox
leesf opened a new pull request #8899: [hotfix][readme] fix how to contribute 
to Apache Flink url
URL: https://github.com/apache/flink/pull/8899
 
 
   *Fix url error in README.md*
   
   cc @zentol @fhueske 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#relea

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8857: 
[FLINK-12960][coordination][shuffle] Move 
ResultPartitionDeploymentDescriptor#releasedOnConsumption to 
PartitionDescriptor#releasedOnConsumption
URL: https://github.com/apache/flink/pull/8857#discussion_r297530334
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
 ##
 @@ -73,19 +74,26 @@ public NettyShuffleDescriptorBuilder 
setConnectionIndex(int connectionIndex) {
return this;
}
 
+   public NettyShuffleDescriptorBuilder setBlocking(boolean blocking) {
 
 Review comment:
   `setIsBlocking(boolean isBlocking)` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11767) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8

2019-06-26 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-11767:


Assignee: (was: Tzu-Li (Gordon) Tai)

> Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8
> -
>
> Key: FLINK-11767
> URL: https://issues.apache.org/jira/browse/FLINK-11767
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.8.0
>Reporter: vinoyang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover 
> restoring from Flink 1.8.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11767) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8

2019-06-26 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873067#comment-16873067
 ] 

vinoyang commented on FLINK-11767:
--

I have released the assignee.

> Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8
> -
>
> Key: FLINK-11767
> URL: https://issues.apache.org/jira/browse/FLINK-11767
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.8.0
>Reporter: vinoyang
>Priority: Blocker
> Fix For: 1.9.0
>
>
> Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover 
> restoring from Flink 1.8.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11767) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8

2019-06-26 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-11767:


Assignee: Tzu-Li (Gordon) Tai  (was: vinoyang)

> Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8
> -
>
> Key: FLINK-11767
> URL: https://issues.apache.org/jira/browse/FLINK-11767
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System, Tests
>Affects Versions: 1.8.0
>Reporter: vinoyang
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.9.0
>
>
> Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover 
> restoring from Flink 1.8.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] ifndef-SleePy opened a new pull request #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter

2019-06-26 Thread GitBox
ifndef-SleePy opened a new pull request #8894:  [FLINK-12961][datastream] 
Providing an internal execution method of StreamExecutionEnvironment accepting 
StreamGraph as input parameter
URL: https://github.com/apache/flink/pull/8894
 
 
   ## What is the purpose of the change
   
   * Expose an internal method `execute(StreamGraph)` of 
`StreamExecutionEnvironment`. So the pluggable runner could get a chance to set 
properties of `StreamGraph`.
   
   ## Brief change log
   
   * Add a new abstract method `execute(StreamGraph)` of 
`StreamExecutionEnvironment`
   * Make `execute(jobName)` of `StreamExecutionEnvironment` as an 
implementation since all subclasses have same logic
   
   ## Verifying this change
   
   * This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12961) StreamExecutionEnvironment supports executing job with StreamGraph

2019-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12961:
---
Labels: pull-request-available  (was: )

> StreamExecutionEnvironment supports executing job with StreamGraph
> --
>
> Key: FLINK-12961
> URL: https://issues.apache.org/jira/browse/FLINK-12961
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Expose an internal method {{execute(StreamGraph)}} of 
> {{StreamExecutionEnvironment}}. So the pluggable runner could get a chance to 
> set properties of {{StreamGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297538465
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+   new 

[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297567190
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static 

[jira] [Closed] (FLINK-11955) Modify build to move filesystems from lib to plugins folder

2019-06-26 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-11955.
--
Resolution: Invalid

Users were always expected to move the file system jars manually from `opt` to 
`lib`, so we can expect them to move them manually from `opt` to `plugins` as 
well. Because of that I'm closing the ticket.

For the details, please check out the [discussion in the pull 
request|https://github.com/apache/flink/pull/8199#issuecomment-500820126].

> Modify build to move filesystems from lib to plugins folder
> ---
>
> Key: FLINK-11955
> URL: https://issues.apache.org/jira/browse/FLINK-11955
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Stefan Richter
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #8893: [hotfix][python] Align the signature of type utility methods with Java

2019-06-26 Thread GitBox
flinkbot edited a comment on issue #8893: [hotfix][python] Align the signature 
of type utility methods with Java
URL: https://github.com/apache/flink/pull/8893#issuecomment-505713502
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8893: [hotfix][python] Align the signature of type utility methods with Java

2019-06-26 Thread GitBox
sunjincheng121 commented on issue #8893: [hotfix][python] Align the signature 
of type utility methods with Java
URL: https://github.com/apache/flink/pull/8893#issuecomment-505801756
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] alpinegizmo commented on issue #8607: [FLINK-12652] [documentation] add first version of a glossary

2019-06-26 Thread GitBox
alpinegizmo commented on issue #8607: [FLINK-12652] [documentation] add first 
version of a glossary
URL: https://github.com/apache/flink/pull/8607#issuecomment-505807000
 
 
   For a first version, this looks quite good already. Are we collecting 
somewhere a list of terms that might be added at some point? Job scheduling and 
back pressure come to mind as examples.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime

2019-06-26 Thread GitBox
liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] 
Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
URL: https://github.com/apache/flink/pull/8757#discussion_r297586451
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializer.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+@Internal
+public final class LocalDateTimeSerializer extends 
TypeSerializerSingleton {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final LocalDateTimeSerializer INSTANCE = new 
LocalDateTimeSerializer();
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public LocalDateTime createInstance() {
+   return LocalDateTime.of(
+   LocalDateSerializer.INSTANCE.createInstance(),
+   LocalTimeSerializer.INSTANCE.createInstance());
+   }
+
+   @Override
+   public LocalDateTime copy(LocalDateTime from) {
+   return from;
+   }
+
+   @Override
+   public LocalDateTime copy(LocalDateTime from, LocalDateTime reuse) {
+   return from;
+   }
+
+   @Override
+   public int getLength() {
+   return LocalDateSerializer.INSTANCE.getLength() + 
LocalTimeSerializer.INSTANCE.getLength();
+   }
+
+   @Override
+   public void serialize(LocalDateTime record, DataOutputView target) 
throws IOException {
+   if (record == null) {
+   LocalDateSerializer.INSTANCE.serialize(null, target);
+   LocalTimeSerializer.INSTANCE.serialize(null, target);
+   } else {
+   
LocalDateSerializer.INSTANCE.serialize(record.toLocalDate(), target);
+   
LocalTimeSerializer.INSTANCE.serialize(record.toLocalTime(), target);
+   }
+   }
+
+   @Override
+   public LocalDateTime deserialize(DataInputView source) throws 
IOException {
+   LocalDate localDate = 
LocalDateSerializer.INSTANCE.deserialize(source);
+   LocalTime localTime = 
LocalTimeSerializer.INSTANCE.deserialize(source);
+   if (localDate == null && localTime == null) {
+   return null;
+   } else if (localDate == null || localTime == null) {
+   throw new IOException("LocalDate and LocalTime are 
either null or not null together.");
 
 Review comment:
   Exactly one of LocalDate and LocalTime is null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots.

2019-06-26 Thread GitBox
gaoyunhaii commented on a change in pull request #8846: [FLINK-12766][runtime] 
Dynamically allocate TaskExecutor's managed memory to slots.
URL: https://github.com/apache/flink/pull/8846#discussion_r297586457
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -128,6 +130,20 @@
/** Resource profiles of slots that can be acquired. */
private final HashSet availableSlotResourceProfiles = 
new HashSet<>();
 
+   /**
+* All allocated slots's allocation resource profile.
 
 Review comment:
   slots's -> slots', or The resource profiles of all allocated slots


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] gaoyunhaii commented on a change in pull request #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots.

2019-06-26 Thread GitBox
gaoyunhaii commented on a change in pull request #8846: [FLINK-12766][runtime] 
Dynamically allocate TaskExecutor's managed memory to slots.
URL: https://github.com/apache/flink/pull/8846#discussion_r297586457
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -128,6 +130,20 @@
/** Resource profiles of slots that can be acquired. */
private final HashSet availableSlotResourceProfiles = 
new HashSet<>();
 
+   /**
+* All allocated slots's allocation resource profile.
 
 Review comment:
   slots's -> slots'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8858: [hotfix][tests] Change some 
StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r297592011
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1023,25 +1014,63 @@ protected AbstractStateBackend 
createInnerBackend(Configuration config) {
// 

// 

 
+   private enum Event {
+   TASK_IS_RUNNING,
+   }
+
private static class EmptyStreamTask extends StreamTask> {
 
-   public EmptyStreamTask(Environment env) {
+   private boolean isRunningFoo;
+   private final LinkedBlockingQueue eventQueue;
+   private final OperatorChain> overrideOperatorChain;
+   private volatile boolean sourceFinished;
+
+   EmptyStreamTask(Environment env, LinkedBlockingQueue 
eventQueue, OperatorChain> 
operatorChain) {
super(env, null);
+   this.eventQueue = eventQueue;
+   this.overrideOperatorChain = operatorChain;
}
 
@Override
-   protected void init() throws Exception {}
+   protected void init() {
+   if (overrideOperatorChain != null) {
+   super.operatorChain = 
this.overrideOperatorChain;
+   super.headOperator = 
super.operatorChain.getHeadOperator();
+   }
+   }
 
@Override
-   protected void performDefaultAction(ActionContext context) 
throws Exception {
-   context.allActionsCompleted();
+   protected void performDefaultAction(ActionContext context) {
 
 Review comment:
   I have changed the way how the tests are waiting for task to be running. 
That way it should not conflict with the future changes in your PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] Add elaborated partition release logic

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8804: 
[FLINK-12883][WIP][runtime] Add elaborated partition release logic
URL: https://github.com/apache/flink/pull/8804#discussion_r297592199
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1630,6 +1627,79 @@ public boolean updateState(TaskExecutionState state) {
}
}
 
+   private boolean updateStateInternal(final TaskExecutionState state, 
final Execution attempt) {
+   Map> accumulators;
+
+   switch (state.getExecutionState()) {
+   case RUNNING:
+   return attempt.switchToRunning();
+
+   case FINISHED:
+   // this deserialization is exception-free
+   accumulators = deserializeAccumulators(state);
+   attempt.markFinished(accumulators, 
state.getIOMetrics());
+   return true;
+
+   case CANCELED:
+   // this deserialization is exception-free
+   accumulators = deserializeAccumulators(state);
+   attempt.completeCancelling(accumulators, 
state.getIOMetrics());
+   return true;
+
+   case FAILED:
+   // this deserialization is exception-free
+   accumulators = deserializeAccumulators(state);
+   
attempt.markFailed(state.getError(userClassLoader), accumulators, 
state.getIOMetrics());
+   return true;
+
+   default:
+   // we mark as failed and return false, which 
triggers the TaskManager
+   // to remove the task
+   attempt.fail(new Exception("TaskManager sent 
illegal state update: " + state.getExecutionState()));
+   return false;
+   }
+   }
+
+   private void maybeReleasePartitions(final TaskExecutionState state, 
final Execution attempt) {
+   final ExecutionVertexID finishedExecutionVertex = 
attempt.getVertex().getID();
+
+   if (state.getExecutionState() == ExecutionState.FINISHED) {
+   final List 
releasablePartitions = 
partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
+   releasePartitions(releasablePartitions);
 
 Review comment:
   We might only need `SchedulingTopology` which was already used in below 
`createResultPartitionId` method.  It might be like this in 
`createResultPartitionId`.
   `
   SchedulingResultPartition schedulingResultPartition = 
schedulingTopology.getResultPartitionOrThrow(resultPartition);
   
   ResultPartitionID resultPartitonId = 
schedulingResultPartition.getResultPartitionId();
   `
   If we add the `ResultPartitionID` info in the constructor of 
`DefaultSchedulingResultPartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11956) Remove shading from filesystems build

2019-06-26 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-11956:
---
Priority: Blocker  (was: Major)

> Remove shading from filesystems build
> -
>
> Key: FLINK-11956
> URL: https://issues.apache.org/jira/browse/FLINK-11956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Stefan Richter
>Assignee: vinoyang
>Priority: Blocker
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11956) Remove shading from filesystems build

2019-06-26 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873211#comment-16873211
 ] 

Piotr Nowojski commented on FLINK-11956:


I think we should decide how do we want to proceed with this one and I see two 
options:
 # Treat plugins as an experimental feature in 1.9, deprecate non plugin 
{{FileSystems}} in 1.10 so we can remove the shading (this ticket) in 1.11
 # Deprecate non plugin {{FileSystems}} in 1.9 and already encourage users to 
switch to plugins so we can remove the shading in 1.10

I'm marking this as a blocker for 1.9 release to resolve this discussion before 
the actual release.

> Remove shading from filesystems build
> -
>
> Key: FLINK-11956
> URL: https://issues.apache.org/jira/browse/FLINK-11956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.9.0
>Reporter: Stefan Richter
>Assignee: vinoyang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] 1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8858: [hotfix][tests] Change some 
StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r29760
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1023,25 +1014,63 @@ protected AbstractStateBackend 
createInnerBackend(Configuration config) {
// 

// 

 
+   private enum Event {
+   TASK_IS_RUNNING,
+   }
+
private static class EmptyStreamTask extends StreamTask> {
 
 Review comment:
   I have renamed to `MockStreamTask`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8858: [hotfix][tests] Change some 
StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r297609906
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -805,29 +723,102 @@ public void 
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
new MockEnvironmentBuilder()
.setUserCodeClassLoader(new 
TestUserCodeClassLoader())
.build()) {
-   TimeServiceTask timerServiceTask = new 
TimeServiceTask(mockEnvironment);
+   RunningTask task = runTask(() -> new 
TimeServiceTask(mockEnvironment));
+   task.waitForTaskCompletion(false);
 
-   CompletableFuture invokeFuture = 
CompletableFuture.runAsync(
-   () -> {
-   try {
-   timerServiceTask.invoke();
-   } catch (Exception e) {
-   throw new 
CompletionException(e);
-   }
-   },
-   TestingUtils.defaultExecutor());
-
-   invokeFuture.get();
-
-   assertThat(timerServiceTask.getClassLoaders(), 
hasSize(greaterThanOrEqualTo(1)));
-   assertThat(timerServiceTask.getClassLoaders(), 
everyItem(instanceOf(TestUserCodeClassLoader.class)));
+   assertThat(task.streamTask.getClassLoaders(), 
hasSize(greaterThanOrEqualTo(1)));
+   assertThat(task.streamTask.getClassLoaders(), 
everyItem(instanceOf(TestUserCodeClassLoader.class)));
}
}
 
// 

//  Test Utilities
// 

 
+   private static StreamOperator 
streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) 
throws Exception {
+   StreamOperator operator = mock(StreamOperator.class);
+   when(operator.getOperatorID()).thenReturn(new OperatorID());
+
+   when(operator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
+   .thenReturn(operatorSnapshotResult);
+
+   return operator;
+   }
+
+   private static StreamOperator 
streamOperatorWithSnapshotException(Exception exception) throws Exception {
+   StreamOperator operator = mock(StreamOperator.class);
+   when(operator.getOperatorID()).thenReturn(new OperatorID());
+
+   when(operator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
+   .thenThrow(exception);
+
+   return operator;
+   }
+
+   private static  OperatorChain> 
operatorChain(StreamOperator... streamOperators) {
+   OperatorChain> operatorChain = 
mock(OperatorChain.class);
+   
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+   return operatorChain;
+   }
+
+   private static class RunningTask {
+   final T streamTask;
+   final CompletableFuture invocationFuture;
+
+   RunningTask(T streamTask, CompletableFuture 
invocationFuture) {
+   this.streamTask = streamTask;
+   this.invocationFuture = invocationFuture;
+   }
+
+   void waitForTaskCompletion(boolean cancelled) throws Exception {
+   if (!cancelled) {
+   invocationFuture.get();
+   return;
+   }
+   try {
+   invocationFuture.get();
+   } catch (Exception e) {
+   assertThat(e.getCause(), 
is(instanceOf(CancelTaskException.class)));
 
 Review comment:
   In this case, the `CancelTaskException` is the original exception that 
thrown by `task.invoke()`.
   It's not expected to be wrapped by more exceptions.
   
   The only wrapping exception comes from the `invocationFuture`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8607: [FLINK-12652] 
[documentation] add first version of a glossary
URL: https://github.com/apache/flink/pull/8607#discussion_r297632384
 
 

 ##
 File path: docs/concepts/glossary.md
 ##
 @@ -0,0 +1,168 @@
+---
+title: Glossary
+nav-pos: 3
+nav-title: Glossary
+nav-parent_id: concepts
+---
+
+
+ Flink Application Cluster
+
+A Flink Application Cluster is a dedicated [Flink 
Cluster](./glossary#flink-cluster) that only
+executes a single [Flink Job](./glossary#flink-job). The lifetime of the
+[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the 
Flink Job. Formerly
+Flink Application Clusters were also known as Flink Clusters in *job mode*. 
Compare to
+[Flink Session Cluster](./glossary#flink-session-cluster).
+
+ Flink Cluster
+
+The distributed system consisting of (typically) one Flink Master process and 
one or more Flink
+Taskmanagers processes.
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-26 Thread GitBox
zentol commented on a change in pull request #8646: [FLINK-12735][network] Make 
shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r297632721
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 ##
 @@ -71,7 +71,7 @@ public void beforeTest() {
}
 
@After
-   public void afterTest() {
+   public void afterTest() throws Exception {
this.ioManager.close();
if (!this.ioManager.isProperlyShutDown()) {
 
 Review comment:
   most of the test were failing anyway if the ioManager shutdown failed due to 
subsequent assertions on `isProperlyShutdown`; thus b) is no longer a new 
problem.
   
   I would suggest to go through all tests, remove the `isProperlyShutDown` 
call if the test follows the same pattern, and potentially remove the method 
altogether if it is unused at that point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8607: [FLINK-12652] 
[documentation] add first version of a glossary
URL: https://github.com/apache/flink/pull/8607#discussion_r297632771
 
 

 ##
 File path: docs/concepts/glossary.md
 ##
 @@ -0,0 +1,168 @@
+---
+title: Glossary
+nav-pos: 3
+nav-title: Glossary
+nav-parent_id: concepts
+---
+
+
+ Flink Application Cluster
+
+A Flink Application Cluster is a dedicated [Flink 
Cluster](./glossary#flink-cluster) that only
+executes a single [Flink Job](./glossary#flink-job). The lifetime of the
+[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the 
Flink Job. Formerly
+Flink Application Clusters were also known as Flink Clusters in *job mode*. 
Compare to
+[Flink Session Cluster](./glossary#flink-session-cluster).
+
+ Flink Cluster
+
+The distributed system consisting of (typically) one Flink Master process and 
one or more Flink
+Taskmanagers processes.
+
+ Event
+
+An event is a statement about a change of the state of the domain modelled by 
the
+application. Events can be input and/or output of a stream or batch processing 
application.
+Events are special types of [records](./glossary#Record).
+
+ ExecutionGraph
+
+see [Physical Graph](./glossary#physical-graph)
+
+ Function
+
+Functions are implemented by the user and encapsulate the
+application logic of a Flink program. Most Functions are wrapped by a 
corresponding
+[Operator](./glossary#operator).
+
+ Instance
+
+The term *instance* is used to describe a specific instance of a specific type 
(usually
+[Operator](./glossary#operator) or [Function](./glossary#function)) during 
runtime. As Apache Flink
+is mostly written in Java, this corresponds to the definition of *Instance* or 
*Object* in Java.
+In the context of Apache Flink, the term *parallel instance* is also 
frequently used to emphasize
+that multiple instances of the same [Operator](./glossary#operator) or
+[Function](./glossary#function) type are running in parallel.
+
+ Flink Job
+
+A Flink Job is the runtime representation of a Flink program. A Flink Job can 
either be submitted
+to a long running [Flink Session Cluster](./glossary#flink-session-cluster) or 
it can be started as a
+self-contained [Flink Application 
Cluster](./glossary#flink-application-cluster).
+
+ JobGraph
+
+see [Logical Graph](./glossary#logical-graph)
+
+ Flink JobManager
+
+JobManagers are one of the components running in the
+[Flink JobManger Process](./glossary#flink-jobmanager-process). A JobManager 
is responsible for
+supervising the execution of the [Tasks](./glossary#task) of a single job.
+
+ Logical Graph
+
+A logical graph is a directed graph describing the high-level logic of a 
stream processing program.
+The nodes are [Operators](./glossary#operator) and the edges indicate 
input/output-relationships or
+data streams or data sets.
+
+ Managed State
+
+Managed State describes application state which has been registered with the 
framework. For
+Managed State, Apache Flink will take care about persistence and rescaling 
among other things.
+
+ Flink JobManager Process
+
+The JobManager Process is the master of a [Flink 
Cluster](./glossary#flink-cluster). It is called
+*JobManager* for historical reasons, but actually contains three distinct 
components:
+Flink Resource Manager, Flink Dispatcher and one [Flink 
JobManager](./glossary#flink-jobmanager)
+per running [Flink Job](./glossary#flink-job).
+
+ Operator
+
+Node of a [Logical Graph](./glossary#logical-graph). An Operator performs a 
certain operation,
+which is usually executed by a [Function](./glossary#function). Sources and 
Sinks are special
+Operators for data ingestion and data egress.
+
+ Operator Chain
+
+An Operator Chain consists of two or more consecutive 
[Operators](./glossary#operator) without any
+repartitioning in between. Operators within the same Operation Chain forward 
records to each other
+directly without going through serialization or Flink's network stack.
+
+ Partition
+
+A partition is an independent subset of the overall data stream or data set. A 
data stream or
+data set is divided into partitions by assigning each 
[record](./glossary#Record) to one or more
+partitions. Partitions of data streams or data sets are consumed by 
[Tasks](./glossary#task) during
+runtime. A transformation which changes the way a data stream or data set is 
partitioned is often
+called repartitioning.
+
+ Physical Graph
+
+A physical graph is the result of translating a [Logical 
Graph](./glossary#logical-graph) for
+execution in a distributed runtime. The nodes are [Tasks](./glossary#task) and 
the edges indicate
+input/output-relationships or [partitions](./glossary#partition) of data 
streams or data sets.
+
+ Record
+
+Records are the constituent elements of a data set or data stream.
+[Operators](./glossary#operator) and [Functions](./glossary#Function) receive 
records as input

[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8607: [FLINK-12652] 
[documentation] add first version of a glossary
URL: https://github.com/apache/flink/pull/8607#discussion_r297632737
 
 

 ##
 File path: docs/concepts/glossary.md
 ##
 @@ -0,0 +1,168 @@
+---
+title: Glossary
+nav-pos: 3
+nav-title: Glossary
+nav-parent_id: concepts
+---
+
+
+ Flink Application Cluster
+
+A Flink Application Cluster is a dedicated [Flink 
Cluster](./glossary#flink-cluster) that only
+executes a single [Flink Job](./glossary#flink-job). The lifetime of the
+[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the 
Flink Job. Formerly
+Flink Application Clusters were also known as Flink Clusters in *job mode*. 
Compare to
+[Flink Session Cluster](./glossary#flink-session-cluster).
+
+ Flink Cluster
+
+The distributed system consisting of (typically) one Flink Master process and 
one or more Flink
+Taskmanagers processes.
+
+ Event
+
+An event is a statement about a change of the state of the domain modelled by 
the
+application. Events can be input and/or output of a stream or batch processing 
application.
+Events are special types of [records](./glossary#Record).
+
+ ExecutionGraph
+
+see [Physical Graph](./glossary#physical-graph)
+
+ Function
+
+Functions are implemented by the user and encapsulate the
+application logic of a Flink program. Most Functions are wrapped by a 
corresponding
+[Operator](./glossary#operator).
+
+ Instance
+
+The term *instance* is used to describe a specific instance of a specific type 
(usually
+[Operator](./glossary#operator) or [Function](./glossary#function)) during 
runtime. As Apache Flink
+is mostly written in Java, this corresponds to the definition of *Instance* or 
*Object* in Java.
+In the context of Apache Flink, the term *parallel instance* is also 
frequently used to emphasize
+that multiple instances of the same [Operator](./glossary#operator) or
+[Function](./glossary#function) type are running in parallel.
+
+ Flink Job
+
+A Flink Job is the runtime representation of a Flink program. A Flink Job can 
either be submitted
+to a long running [Flink Session Cluster](./glossary#flink-session-cluster) or 
it can be started as a
+self-contained [Flink Application 
Cluster](./glossary#flink-application-cluster).
+
+ JobGraph
+
+see [Logical Graph](./glossary#logical-graph)
+
+ Flink JobManager
+
+JobManagers are one of the components running in the
+[Flink JobManger Process](./glossary#flink-jobmanager-process). A JobManager 
is responsible for
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12914) Remove legacy InstanceListener

2019-06-26 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-12914.

Resolution: Fixed

master: 126a489e20f4743c361bbf2f2b4a2c890f8026ab

> Remove legacy InstanceListener
> --
>
> Key: FLINK-12914
> URL: https://issues.apache.org/jira/browse/FLINK-12914
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: leesf
>Assignee: leesf
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] 1u0 commented on a change in pull request #8826: [FLINK-12479][operators] Integrate StreamInputProcessor(s) with mailbox

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8826: [FLINK-12479][operators] 
Integrate StreamInputProcessor(s) with mailbox
URL: https://github.com/apache/flink/pull/8826#discussion_r297625182
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ##
 @@ -18,213 +18,16 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.runtime.io.AvailabilityListener;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
+import java.io.Closeable;
 
 /**
- * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
- *
- * This internally uses a {@link StatusWatermarkValve} to keep track of 
{@link Watermark} and
- * {@link StreamStatus} events, and forwards them to event subscribers once the
- * {@link StatusWatermarkValve} determines the {@link Watermark} from all 
inputs has advanced, or
- * that a {@link StreamStatus} needs to be propagated downstream to denote a 
status change.
- *
- * Forwarding elements, watermarks, or status status elements must be 
protected by synchronizing
- * on the given lock object. This ensures that we don't call methods on a
- * {@link OneInputStreamOperator} concurrently with the timer callback or 
other things.
- *
- * @param  The type of the record that can be read with this record reader.
+ * Interface for processing records by StreamTask.
  */
-@Internal
-public class StreamInputProcessor {
-
-   private static final Logger LOG = 
LoggerFactory.getLogger(StreamInputProcessor.class);
-
-   private final StreamTaskInput input;
-
-   private final Object lock;
-
-   private final OperatorChain operatorChain;
-
-   //  Status and Watermark Valve --
-
-   /** Valve that controls how watermarks and stream statuses are 
forwarded. */
-   private StatusWatermarkValve statusWatermarkValve;
-
-   private final StreamStatusMaintainer streamStatusMaintainer;
-
-   private final OneInputStreamOperator streamOperator;
-
-   //  Metrics --
-
-   private final WatermarkGauge watermarkGauge;
-   private Counter numRecordsIn;
-
-   @SuppressWarnings("unchecked")
-   public StreamInputProcessor(
-   InputGate[] inputGates,
-   TypeSerializer inputSerializer,
-   StreamTask checkpointedTask,
-   CheckpointingMode checkpointMode,
-   Object lock,
-   IOManager ioManager,
-   Configuration taskManagerConfig,
-   StreamStatusMaintainer streamStatusMaintainer,
-   OneInputStreamOperator streamOperator,
-   TaskIOMetricGroup metrics,
-   WatermarkGauge watermarkGauge,
-   String taskName,
-   OperatorChain operatorChain) throws IOException {
-
-   InputGate inputGate = InputGateUtil.createInputGate(inputGates);
-
-   CheckpointBarrierHandler barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
-   checkpointedTask,
-   checkpointMode,
-   ioManager,
-   inputGate,
-   taskManagerConfig,
-   taskName);
-   this.input = new StreamTaskNetworkInput(barrierHandler, 
inputSerializer, ioManager, 0);
-
-   

[jira] [Resolved] (FLINK-12957) Fix thrift and protobuf dependency examples in documentation

2019-06-26 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber resolved FLINK-12957.
-
   Resolution: Fixed
Fix Version/s: 1.8.1
   1.7.3

merged into:
- 1.8: 9ad7cda7a145537e2968416a361e0d22d85828e6
- 1.7: 5b8154c3c6c60ca8a850d82856db84d4334bc327

> Fix thrift and protobuf dependency examples in documentation
> 
>
> Key: FLINK-12957
> URL: https://issues.apache.org/jira/browse/FLINK-12957
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.2, 1.8.0, 1.9.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.1, 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The examples in the docs are not up-to-date anymore and should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12957) Fix thrift and protobuf dependency examples in documentation

2019-06-26 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-12957.
---

> Fix thrift and protobuf dependency examples in documentation
> 
>
> Key: FLINK-12957
> URL: https://issues.apache.org/jira/browse/FLINK-12957
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.2, 1.8.0, 1.9.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.1, 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The examples in the docs are not up-to-date anymore and should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#relea

2019-06-26 Thread GitBox
zhijiangW commented on a change in pull request #8857: 
[FLINK-12960][coordination][shuffle] Move 
ResultPartitionDeploymentDescriptor#releasedOnConsumption to 
PartitionDescriptor#releasedOnConsumption
URL: https://github.com/apache/flink/pull/8857#discussion_r297532281
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 ##
 @@ -103,6 +137,9 @@ public boolean sendScheduleOrUpdateConsumersMessage() {
 * by {@link 
ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
 * and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
 *
+* The partition has to support the corresponding {@link 
ReleaseType} in
 
 Review comment:
   I think it might be better to indicate which value true/false should support 
which `releaseType`, otherwise it seems not clear the mapping between this 
value and release type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5018) Make source idle timeout user configurable

2019-06-26 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873065#comment-16873065
 ] 

vinoyang commented on FLINK-5018:
-

If we do not try to take effort on this issue and open this config option. I 
think we can close it. What do you think? [~aljoscha] [~tzulitai]

> Make source idle timeout user configurable
> --
>
> Key: FLINK-5018
> URL: https://issues.apache.org/jira/browse/FLINK-5018
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There are 2 cases where sources are considered idle and should emit an idle 
> {{StreamStatus}} downstream, taking Kafka consumer as example:
> - The source instance was not assigned any partitions
> - The source instance was assigned partitions, but they currently don't have 
> any data.
> For the second case, we can only consider it idle after a timeout threshold. 
> It would be good to make this timeout user configurable besides a default 
> value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12971) Remove the constraint that lookup join needs a primary key or index key

2019-06-26 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-12971.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in 1.9.0: cb922bbfa0a26e147638d22aeceaf3e83b01e1d3

> Remove the constraint that lookup join needs a primary key or index key
> ---
>
> Key: FLINK-12971
> URL: https://issues.apache.org/jira/browse/FLINK-12971
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, we add a constraint in dimension table lookup join that the lookup 
> fields must be a primary key or index key. This is not a logic constraint but 
> a performance constraint. Because if there are no indexes on the lookup key, 
> the lookup performance will be poor. 
> We will remove this constraint because not every table have a primary key or 
> indexes (e.g. Hive tables). It's the user's responsibility if the lookup 
> fields are not keys and get a bad performance. In this case, users should add 
> indexes on these fields. In the future, we can also propagate out these 
> tuning information before SQL is executed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 opened a new pull request #8895: [FLINK-12536][coordinator]Make BufferOrEventSequence#getNext non blocking

2019-06-26 Thread GitBox
klion26 opened a new pull request #8895: [FLINK-12536][coordinator]Make 
BufferOrEventSequence#getNext non blocking
URL: https://github.com/apache/flink/pull/8895
 
 
   ## What is the purpose of the change
   
   Currently it is non-blocking in case of credit-based flow control (default), 
however for SpilledBufferOrEventSequence it is blocking on reading from file. 
We might want to consider reimplementing it to be non blocking with 
CompletableFuture isAvailable() method.
   
   Otherwise we will block mailbox processing for the duration of reading from 
file - for example we will block processing time timers and potentially in the 
future network flushes.
   
   ## Brief change log
   
   Use a ByteBuffer pool to read the BufferOrEvent asynchronous.
   - the pool size if default 2, can be configurated by key 
`taskmanager.memory.async-load.buffer-count`
   - will reuse the read thread in `IOManager`
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as:
   - `SpilledBufferOrEventSequenceTest.java`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**not applicable**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12536) Make BufferOrEventSequence#getNext() non-blocking

2019-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12536:
---
Labels: pull-request-available  (was: )

> Make BufferOrEventSequence#getNext() non-blocking
> -
>
> Key: FLINK-12536
> URL: https://issues.apache.org/jira/browse/FLINK-12536
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Piotr Nowojski
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>
> Currently it is non-blocking in case of credit-based flow control (default), 
> however for \{{SpilledBufferOrEventSequence}} it is blocking on reading from 
> file. We might want to consider reimplementing it to be non blocking with 
> {{CompletableFuture isAvailable()}} method.
>  
> Otherwise we will block mailbox processing for the duration of reading from 
> file - for example we will block processing time timers and potentially in 
> the future network flushes.
>  
> This is not a high priority change, since it affects non-default 
> configuration option AND at the moment only processing time timers are 
> planned to be moved to the mailbox for 1.9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-26 Thread GitBox
GJL commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an 
adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8851#discussion_r297556096
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   @ClassRule
+   public static 

[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r297561629
 
 

 ##
 File path: 
flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
 ##
 @@ -67,54 +65,109 @@ public void testDontOverwriteExecutionMode() {
}
 
@Test
-   public void configuredJobIDTakesPrecedenceWithHA() {
-   Optional jobID = Optional.of(JobID.generate());
+   public void configuredJobIdTakesPrecedenceWithHA() {
 
 Review comment:
   Not required here, but imo, those tests that check precedence order would be 
better if they are rewritten as one test that checks all cases as table input.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12821) Fix the bug that fix time quantifier can not be the last element of a pattern

2019-06-26 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-12821.

   Resolution: Fixed
Fix Version/s: 1.9.0

Implemented in 09a4a3ad522fd95cacc4598f69edb6965b7151b5

> Fix the bug that fix time quantifier can not be the last element of a pattern
> -
>
> Key: FLINK-12821
> URL: https://issues.apache.org/jira/browse/FLINK-12821
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / CEP, Table SQL / API
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, exception "Greedy quantifiers are not allowed as the last element 
> of a Pattern yet. Finish your pattern with either a simple variable or 
> reluctant quantifier." will be thrown for patterns such as "a\{2}". Actually 
> greedy property is not meaningful for this kind of pattern.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r297555970
 
 

 ##
 File path: 
flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
 ##
 @@ -67,54 +65,109 @@ public void testDontOverwriteExecutionMode() {
}
 
@Test
-   public void configuredJobIDTakesPrecedenceWithHA() {
-   Optional jobID = Optional.of(JobID.generate());
+   public void configuredJobIdTakesPrecedenceWithHA() {
+   JobID jobId = JobID.generate();
+   String jobIdSeed = null;
+
+   Configuration globalConfiguration = new Configuration();
+   enableHighAvailability(globalConfiguration);
+
+   JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+   jobId,
+   jobIdSeed,
+   globalConfiguration);
+
+   assertThat(jobIdForCluster, is(jobId));
+   }
+
+   @Test
+   public void configuredJobIdTakesPrecedenceWithoutHA() {
+   JobID jobId = JobID.generate();
+   String jobIdSeed = null;
+
+   Configuration globalConfiguration = new Configuration();
+
+   JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+   jobId,
+   jobIdSeed,
+   globalConfiguration);
+
+   assertThat(jobIdForCluster, is(jobId));
+   }
+
+   @Test
+   public void configuredJobIdSeedTakesPrecedenceWithoutHA() {
+   JobID jobId = null;
+   String jobIdSeed = "some-seed";
+
+   Configuration globalConfiguration = new Configuration();
+
+   JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+   jobId,
+   jobIdSeed,
+   globalConfiguration);
+
+   assertThat(jobIdForCluster, is(JobID.fromSeed(jobIdSeed)));
+   }
+
+   @Test
+   public void configuredJobIdSeedTakesPrecedenceWithHA() {
+   JobID jobId = null;
+   String jobIdSeed = "some-seed";
 
Configuration globalConfiguration = new Configuration();
enableHighAvailability(globalConfiguration);
 
JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
-   jobID,
+   jobId,
+   jobIdSeed,
globalConfiguration);
 
-   assertThat(jobIdForCluster, is(jobID.get()));
+   assertThat(jobIdForCluster, is(JobID.fromSeed(jobIdSeed)));
}
 
@Test
-   public void configuredJobIDTakesPrecedenceWithoutHA() {
-   Optional jobID = Optional.of(JobID.generate());
+   public void configuredJobIdTakesPrecedenceOverJobIdSeed() {
+   JobID jobId = JobID.generate();
+   String jobIdSeed = "some-seed";
 
Configuration globalConfiguration = new Configuration();
+   enableHighAvailability(globalConfiguration);
 
JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
-   jobID,
+   jobId,
+   jobIdSeed,
globalConfiguration);
 
-   assertThat(jobIdForCluster, is(jobID.get()));
+   assertThat(jobIdForCluster, is(jobId));
}
 
@Test
-   public void jobIDdefaultsToZeroWithHA() {
-   Optional jobID = Optional.empty();
+   public void jobIdDefaultsToZeroWithHA() {
+   JobID jobId = null;
+   String jobIdSeed = null;
 
Configuration globalConfiguration = new Configuration();
enableHighAvailability(globalConfiguration);
 
JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
-   jobID,
+   jobId,
+   jobIdSeed,
globalConfiguration);
 
assertThat(jobIdForCluster, is(ZERO_JOB_ID));
}
 
@Test
-   public void jobIDdefaultsToRandomJobIDWithoutHA() {
-   Optional jobID = Optional.empty();
+   public void jobIdDefaultsToRandomJobIDWithoutHA() {
 
 Review comment:
   Rename `jobIdDefaultsToRandomJobIDWithoutHA` to 
`jobIdDefaultsToRandomJobIdWithoutHA`, to finish getting rid of `ID` in the 
method/variables names in this file?
   
   **Side note:** not required here, but imo, those tests that check precedence 
order would be better if they are rewritten as one test that checks all cases 
as table input.


[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r297561629
 
 

 ##
 File path: 
flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
 ##
 @@ -67,54 +65,109 @@ public void testDontOverwriteExecutionMode() {
}
 
@Test
-   public void configuredJobIDTakesPrecedenceWithHA() {
-   Optional jobID = Optional.of(JobID.generate());
+   public void configuredJobIdTakesPrecedenceWithHA() {
 
 Review comment:
   Not required here, but imo, those tests that check precedence order would be 
better if they are rewritten as one test that checks all cases as table input.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r297553510
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh
 ##
 @@ -64,14 +64,14 @@ fi
 
 eval $(minikube docker-env)
 cd "$DOCKER_MODULE_DIR"
-./build.sh --from-local-dist --job-jar 
${FLINK_DIR}/examples/batch/WordCount.jar --image-name ${FLINK_IMAGE_NAME}
+./build.sh --from-local-dist --job-artifacts 
${FLINK_DIR}/examples/batch/WordCount.jar --image-name ${FLINK_IMAGE_NAME}
 cd "$END_TO_END_DIR"
 
 
-kubectl create -f ${KUBERNETES_MODULE_DIR}/job-cluster-service.yaml
-envsubst '${FLINK_IMAGE_NAME} ${FLINK_JOB} ${FLINK_JOB_PARALLELISM} 
${FLINK_JOB_ARGUMENTS}' < ${CONTAINER_SCRIPTS}/job-cluster-job.yaml.template | 
kubectl create -f -
-envsubst '${FLINK_IMAGE_NAME} ${FLINK_JOB_PARALLELISM}' < 
${CONTAINER_SCRIPTS}/task-manager-deployment.yaml.template | kubectl create -f -
-kubectl wait --for=condition=complete job/flink-job-cluster --timeout=1h
-kubectl cp `kubectl get pods | awk '/task-manager/ {print 
$1}'`:/cache/${OUTPUT_FILE} ${OUTPUT_VOLUME}/${OUTPUT_FILE}
+envsubst '${FLINK_APPLICATION_NAME}' < 
${KUBERNETES_MODULE_DIR}/job-cluster-service.yaml.template | kubectl apply -f -
+envsubst '${FLINK_APPLICATION_NAME} ${FLINK_IMAGE_NAME} ${FLINK_JOB} 
${FLINK_JOB_PARALLELISM} ${FLINK_JOB_ARGUMENTS}' < 
${KUBERNETES_MODULE_DIR}/job-cluster-job.yaml.template | kubectl apply -f -
+envsubst '${FLINK_APPLICATION_NAME} ${FLINK_IMAGE_NAME} 
${FLINK_JOB_PARALLELISM}' < 
${KUBERNETES_MODULE_DIR}/task-manager-deployment.yaml.template | kubectl apply 
-f -
 
 Review comment:
   Personally, I'd stick with `kubectl create ...` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r297555970
 
 

 ##
 File path: 
flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
 ##
 @@ -67,54 +65,109 @@ public void testDontOverwriteExecutionMode() {
}
 
@Test
-   public void configuredJobIDTakesPrecedenceWithHA() {
-   Optional jobID = Optional.of(JobID.generate());
+   public void configuredJobIdTakesPrecedenceWithHA() {
+   JobID jobId = JobID.generate();
+   String jobIdSeed = null;
+
+   Configuration globalConfiguration = new Configuration();
+   enableHighAvailability(globalConfiguration);
+
+   JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+   jobId,
+   jobIdSeed,
+   globalConfiguration);
+
+   assertThat(jobIdForCluster, is(jobId));
+   }
+
+   @Test
+   public void configuredJobIdTakesPrecedenceWithoutHA() {
+   JobID jobId = JobID.generate();
+   String jobIdSeed = null;
+
+   Configuration globalConfiguration = new Configuration();
+
+   JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+   jobId,
+   jobIdSeed,
+   globalConfiguration);
+
+   assertThat(jobIdForCluster, is(jobId));
+   }
+
+   @Test
+   public void configuredJobIdSeedTakesPrecedenceWithoutHA() {
+   JobID jobId = null;
+   String jobIdSeed = "some-seed";
+
+   Configuration globalConfiguration = new Configuration();
+
+   JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+   jobId,
+   jobIdSeed,
+   globalConfiguration);
+
+   assertThat(jobIdForCluster, is(JobID.fromSeed(jobIdSeed)));
+   }
+
+   @Test
+   public void configuredJobIdSeedTakesPrecedenceWithHA() {
+   JobID jobId = null;
+   String jobIdSeed = "some-seed";
 
Configuration globalConfiguration = new Configuration();
enableHighAvailability(globalConfiguration);
 
JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
-   jobID,
+   jobId,
+   jobIdSeed,
globalConfiguration);
 
-   assertThat(jobIdForCluster, is(jobID.get()));
+   assertThat(jobIdForCluster, is(JobID.fromSeed(jobIdSeed)));
}
 
@Test
-   public void configuredJobIDTakesPrecedenceWithoutHA() {
-   Optional jobID = Optional.of(JobID.generate());
+   public void configuredJobIdTakesPrecedenceOverJobIdSeed() {
+   JobID jobId = JobID.generate();
+   String jobIdSeed = "some-seed";
 
Configuration globalConfiguration = new Configuration();
+   enableHighAvailability(globalConfiguration);
 
JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
-   jobID,
+   jobId,
+   jobIdSeed,
globalConfiguration);
 
-   assertThat(jobIdForCluster, is(jobID.get()));
+   assertThat(jobIdForCluster, is(jobId));
}
 
@Test
-   public void jobIDdefaultsToZeroWithHA() {
-   Optional jobID = Optional.empty();
+   public void jobIdDefaultsToZeroWithHA() {
+   JobID jobId = null;
+   String jobIdSeed = null;
 
Configuration globalConfiguration = new Configuration();
enableHighAvailability(globalConfiguration);
 
JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
-   jobID,
+   jobId,
+   jobIdSeed,
globalConfiguration);
 
assertThat(jobIdForCluster, is(ZERO_JOB_ID));
}
 
@Test
-   public void jobIDdefaultsToRandomJobIDWithoutHA() {
-   Optional jobID = Optional.empty();
+   public void jobIdDefaultsToRandomJobIDWithoutHA() {
 
 Review comment:
   Rename `jobIdDefaultsToRandomJobIDWithoutHA` to 
`jobIdDefaultsToRandomJobIdWithoutHA`, to finish getting rid of `ID` in the 
method/variables names in this file?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r297534362
 
 

 ##
 File path: 
flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 ##
 @@ -92,9 +105,23 @@ public StandaloneJobClusterConfiguration 
createResult(@Nonnull CommandLine comma
restPort,
savepointRestoreSettings,
jobId,
+   jobIdSeed,
jobClassName);
}
 
+   private String getJobIdSeed(@Nonnull final CommandLine commandLine) 
throws FlinkParseException {
+
+   boolean isJobIdSeedConfigured = 
commandLine.hasOption(JOB_ID_SEED_OPTION.getOpt());
+   boolean isJobIdConfigured = 
commandLine.hasOption(JOB_ID_OPTION.getOpt());
+
+   if (isJobIdSeedConfigured && isJobIdConfigured) {
 
 Review comment:
   The ticket description says that both options (`--job-id` and 
`--job--id-seed`) can be passed and they will be resolved by precedence order.
   I think here you are making them mutually exclusive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r297534771
 
 

 ##
 File path: 
flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 ##
 @@ -60,13 +60,25 @@
.desc("Job ID of the job to run.")
.build();
 
+   private static final Option JOB_ID_SEED_OPTION = Option.builder("jids")
+   .longOpt("job-id-seed")
 
 Review comment:
   What do you think about an alternative and more explicit name, like 
`--random-job-id-seed`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys merged pull request #8715: [FLINK-12821][table][cep] Fix the bug that fix time quantifier can not be the last element of a pattern

2019-06-26 Thread GitBox
dawidwys merged pull request #8715: [FLINK-12821][table][cep] Fix the bug that 
fix time quantifier can not be the last element of a pattern
URL: https://github.com/apache/flink/pull/8715
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-26 Thread GitBox
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r297541840
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/api/common/JobID.java
 ##
 @@ -64,27 +65,40 @@ public JobID(long lowerPart, long upperPart) {
public JobID(byte[] bytes) {
super(bytes);
}
-   
+
// 

//  Static factory methods
// 

 
/**
 * Creates a new (statistically) random JobID.
-* 
+*
 * @return A new random JobID.
 */
public static JobID generate() {
return new JobID();
}
 
+   /**
+* Creates a new JobID based on the given seed. The JobIDs returned by 
two invocations of this
+* method with the same seed will be equal.
+*
+* @param seed the seed to base the generation of the JobID
+*
+* @return A new JobID based on the given seed.
+*/
+   public static JobID fromSeed(String seed) {
+   Random rnd = new Random(seed.hashCode());
 
 Review comment:
   In case of HA mode, do I get it right that it's desirable, that different 
Flink jobs must have different IDs?
   
   I think `String.hashCode()` can be too weak source of random seed to 
generate such job IDs.
   Maybe consider some better [hash 
functions](https://docs.oracle.com/javase/8/docs/api/java/security/MessageDigest.html)
 instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-26 Thread GitBox
zentol commented on a change in pull request #8646: [FLINK-12735][network] Make 
shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r297569347
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 ##
 @@ -149,16 +150,15 @@ private void createSubpartitions(
}
}
 
-   private static void initializeBoundedBlockingPartitions(
+   private void initializeBoundedBlockingPartitions(
ResultSubpartition[] subpartitions,
-   ResultPartition parent,
-   IOManager ioManager) {
 
 Review comment:
   couldn't you pass in the channelManager instead and keep it static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-26 Thread GitBox
zentol commented on a change in pull request #8646: [FLINK-12735][network] Make 
shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r297572793
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
 ##
 @@ -99,8 +93,8 @@ ResultSubpartition createFailingWritesSubpartition() throws 
Exception {
 
// 

 
-   static Path tmpPath() throws IOException {
-   return new File(TMP_DIR.newFolder(), "subpartition").toPath();
+   static Path tmpPath() {
+   return 
fileChannelManager.createChannel().getPathFile().toPath();
 
 Review comment:
   This commit is strictly speaking not necessary, correct? If so, I'd like to 
exclude it to prevent merge conflict with #8880.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-26 Thread GitBox
zentol commented on a change in pull request #8646: [FLINK-12735][network] Make 
shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r297567048
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 ##
 @@ -71,7 +71,7 @@ public void beforeTest() {
}
 
@After
-   public void afterTest() {
+   public void afterTest() throws Exception {
this.ioManager.close();
if (!this.ioManager.isProperlyShutDown()) {
 
 Review comment:
   looking at this it makes it seem like ioManager.close should never throw 
exceptions. We can already see downsides of this change since, if close() 
throws an exception, 
   a) isProperlyShutDown is never called
   b) the memory manager would not be shutdown


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-06-26 Thread GitBox
becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-505800856
 
 
   @Xeli Yeah, let's do that. I can give a final pass on the patch after you 
merge the latest changes. @rmetzger what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10862) REST API does not show program descriptions of "simple" ProgramDescription

2019-06-26 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873164#comment-16873164
 ] 

TisonKun commented on FLINK-10862:
--

Also I find it useless {{Program}} is. How can users defined how to return a 
{{Plan}}?

I highly suspect whether there is any user implementing their job via 
{{Program}} interface. Otherwise we can deprecate or remove it for a rational 
codebase.

> REST API does not show program descriptions of "simple" ProgramDescription
> --
>
> Key: FLINK-10862
> URL: https://issues.apache.org/jira/browse/FLINK-10862
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.6.2
>Reporter: Flavio Pompermaier
>Priority: Major
>  Labels: rest_api
>
> When uploading a jar containing a main class implementing ProgramDescription 
> interface, the REST API doesn't list its description. It works only if the 
> class implements Program (that I find pretty useless...why should I return 
> the plan?)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12960) Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes

2019-06-26 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin updated FLINK-12960:

Summary: Introduce ShuffleDescriptor#ReleaseType and 
ShuffleDescriptor#getSupportedReleaseTypes  (was: Introduce 
ShuffleDescriptor#ReleaseType)

> Introduce ShuffleDescriptor#ReleaseType and 
> ShuffleDescriptor#getSupportedReleaseTypes
> --
>
> Key: FLINK-12960
> URL: https://issues.apache.org/jira/browse/FLINK-12960
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> {{ResultPartitionDeploymentDescriptor#releasedOnConsumption}} shows the 
> intention how the partition is going to be used by the shuffle user and 
> released. The {{ShuffleDescriptor}} should provide a way to query which 
> release type is supported by shuffle service for this partition. If the 
> requested release type is not supported by the shuffle service for a certain 
> type of partition, the job should fail fast.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-26 Thread GitBox
zentol commented on a change in pull request #8778: [FLINK-12615][coordination] 
Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r297603449
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
 ##
 @@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTracker {
 
 Review comment:
   should introduce an interface for this


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12998) Document Plugins mechanism

2019-06-26 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-12998:
---
Description: 
Plugins mechanism must be documented before the release. We should write down:
 * benefits
 * how to use existing FileSystem plugins
 * how to implement an own custom FileSystem plugin
 * potential issues of relaying on 
{{Thread.currentThread().getContextClassLoader()}}  (currently it's set only 
for {{FileSystemFactory}} class loading and {{FileSystemFactory#create()}} 
method call - if a {{FileSystem}} is accessing {{getContextClassLoader}} during 
runtime (write/reading) it will not work properly as a plugin.

  was:
Plugins mechanism must be documented before the release. We should write down:
 * benefits
 * how to use existing FileSystem plugins
 * how to implement an own custom FileSystem plugin
 * potential issues of relaying on 
{{Thread.currentThread().getContextClassLoader()}} 


> Document Plugins mechanism
> --
>
> Key: FLINK-12998
> URL: https://issues.apache.org/jira/browse/FLINK-12998
> Project: Flink
>  Issue Type: Sub-task
>  Components: FileSystems
>Affects Versions: 1.9.0
>Reporter: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.9.0
>
>
> Plugins mechanism must be documented before the release. We should write down:
>  * benefits
>  * how to use existing FileSystem plugins
>  * how to implement an own custom FileSystem plugin
>  * potential issues of relaying on 
> {{Thread.currentThread().getContextClassLoader()}}  (currently it's set only 
> for {{FileSystemFactory}} class loading and {{FileSystemFactory#create()}} 
> method call - if a {{FileSystem}} is accessing {{getContextClassLoader}} 
> during runtime (write/reading) it will not work properly as a plugin.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8607: [FLINK-12652] 
[documentation] add first version of a glossary
URL: https://github.com/apache/flink/pull/8607#discussion_r297635084
 
 

 ##
 File path: docs/concepts/glossary.md
 ##
 @@ -0,0 +1,168 @@
+---
+title: Glossary
+nav-pos: 3
+nav-title: Glossary
+nav-parent_id: concepts
+---
+
+
+ Flink Application Cluster
+
+A Flink Application Cluster is a dedicated [Flink 
Cluster](./glossary#flink-cluster) that only
+executes a single [Flink Job](./glossary#flink-job). The lifetime of the
+[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the 
Flink Job. Formerly
+Flink Application Clusters were also known as Flink Clusters in *job mode*. 
Compare to
+[Flink Session Cluster](./glossary#flink-session-cluster).
+
+ Flink Cluster
+
+The distributed system consisting of (typically) one Flink Master process and 
one or more Flink
+Taskmanagers processes.
+
+ Event
+
+An event is a statement about a change of the state of the domain modelled by 
the
+application. Events can be input and/or output of a stream or batch processing 
application.
+Events are special types of [records](./glossary#Record).
+
+ ExecutionGraph
+
+see [Physical Graph](./glossary#physical-graph)
+
+ Function
+
+Functions are implemented by the user and encapsulate the
+application logic of a Flink program. Most Functions are wrapped by a 
corresponding
+[Operator](./glossary#operator).
+
+ Instance
+
+The term *instance* is used to describe a specific instance of a specific type 
(usually
+[Operator](./glossary#operator) or [Function](./glossary#function)) during 
runtime. As Apache Flink
+is mostly written in Java, this corresponds to the definition of *Instance* or 
*Object* in Java.
+In the context of Apache Flink, the term *parallel instance* is also 
frequently used to emphasize
+that multiple instances of the same [Operator](./glossary#operator) or
+[Function](./glossary#function) type are running in parallel.
+
+ Flink Job
+
+A Flink Job is the runtime representation of a Flink program. A Flink Job can 
either be submitted
+to a long running [Flink Session Cluster](./glossary#flink-session-cluster) or 
it can be started as a
+self-contained [Flink Application 
Cluster](./glossary#flink-application-cluster).
+
+ JobGraph
+
+see [Logical Graph](./glossary#logical-graph)
+
+ Flink JobManager
+
+JobManagers are one of the components running in the
+[Flink JobManger Process](./glossary#flink-jobmanager-process). A JobManager 
is responsible for
+supervising the execution of the [Tasks](./glossary#task) of a single job.
+
+ Logical Graph
+
+A logical graph is a directed graph describing the high-level logic of a 
stream processing program.
+The nodes are [Operators](./glossary#operator) and the edges indicate 
input/output-relationships or
+data streams or data sets.
+
+ Managed State
+
+Managed State describes application state which has been registered with the 
framework. For
+Managed State, Apache Flink will take care about persistence and rescaling 
among other things.
+
+ Flink JobManager Process
+
+The JobManager Process is the master of a [Flink 
Cluster](./glossary#flink-cluster). It is called
+*JobManager* for historical reasons, but actually contains three distinct 
components:
+Flink Resource Manager, Flink Dispatcher and one [Flink 
JobManager](./glossary#flink-jobmanager)
+per running [Flink Job](./glossary#flink-job).
+
+ Operator
+
+Node of a [Logical Graph](./glossary#logical-graph). An Operator performs a 
certain operation,
+which is usually executed by a [Function](./glossary#function). Sources and 
Sinks are special
+Operators for data ingestion and data egress.
+
+ Operator Chain
+
+An Operator Chain consists of two or more consecutive 
[Operators](./glossary#operator) without any
+repartitioning in between. Operators within the same Operation Chain forward 
records to each other
+directly without going through serialization or Flink's network stack.
+
+ Partition
+
+A partition is an independent subset of the overall data stream or data set. A 
data stream or
+data set is divided into partitions by assigning each 
[record](./glossary#Record) to one or more
+partitions. Partitions of data streams or data sets are consumed by 
[Tasks](./glossary#task) during
+runtime. A transformation which changes the way a data stream or data set is 
partitioned is often
+called repartitioning.
+
+ Physical Graph
+
+A physical graph is the result of translating a [Logical 
Graph](./glossary#logical-graph) for
+execution in a distributed runtime. The nodes are [Tasks](./glossary#task) and 
the edges indicate
+input/output-relationships or [partitions](./glossary#partition) of data 
streams or data sets.
+
+ Record
+
+Records are the constituent elements of a data set or data stream.
+[Operators](./glossary#operator) and [Functions](./glossary#Function) receive 
records as input

[GitHub] [flink] azagrebin commented on a change in pull request #8896: [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to JM concept

2019-06-26 Thread GitBox
azagrebin commented on a change in pull request #8896: [FLINK-12993][runtime] 
Refactor forceReleaseOnConsumption to JM concept
URL: https://github.com/apache/flink/pull/8896#discussion_r297633733
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 ##
 @@ -39,23 +40,22 @@
 public class ResultPartitionFactoryTest extends TestLogger {
 
@Test
-   public void testForceConsumptionOnReleaseEnabled() {
-   testForceConsumptionOnRelease(true);
+   public void testConsumptionOnReleaseEnabled() {
+   testConsumptionOnRelease(true);
 
 Review comment:
   if `testConsumptionOnRelease` returned `ResultPartition`, you could do the 
assertion here.
   Also `testConsumptionOnRelease ` could directly accept the `ReleaseType` 
then there would be less ifs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12176) Unify JobGraph creation in CliFrontend

2019-06-26 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873287#comment-16873287
 ] 

TisonKun commented on FLINK-12176:
--

[~till.rohrmann] I can see your concern now because 
{{ContextExecutionEnvironment}} really execute all commands inside main method 
of user program, while {{OptimizerPlanEnvironment}} abort control.

I am going to draft a design doc on separating job compilation, cluster 
deployment and job submission, in which I will discuss this "unification of job 
compilation" things closer.

> Unify JobGraph creation in CliFrontend
> --
>
> Key: FLINK-12176
> URL: https://issues.apache.org/jira/browse/FLINK-12176
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Affects Versions: 1.9.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Attachments: patch.diff
>
>
> Currently, we create {{JobGraph}} by the following process
> * if the cluster start in job mode, we create {{JobGraph}} by 
> {{PackagedProgramUtils#createJobGraph}} and deploy a job cluster
> * if the cluster start in session mode, we create {{JobGraph}} and submit it 
> by {{CliFrontend#executeProgram}}, which internally the same as above but 
> using {{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}.
> {{ContextEnvironment}} not only create the job graph but also submit it. 
> However, the processes of {{JobGraph}} creation in job mode and session mode 
> are similar. That means, we can unify the process by always create 
> {{JobGraph}} by {{PackagedProgramUtils#createJobGraph}}. And,
> * in job mode, deploy job cluster with the {{JobGraph}}
> * in session mode, submit the {{JobGraph}} to the session cluster
> From a higher view, it is helpful for a common view of job submission in both 
> job and session mode and give opportunities to refactor legacy client codes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha commented on a change in pull request #8852: [FLINK-12798][table-api][table-planner] Add a proper discover mechanism that will enable switching between Flink & Blink Planner/

2019-06-26 Thread GitBox
aljoscha commented on a change in pull request #8852:  
[FLINK-12798][table-api][table-planner] Add a proper discover mechanism that 
will  enable switching between Flink & Blink Planner/Executor
URL: https://github.com/apache/flink/pull/8852#discussion_r297529895
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/PlannerDescriptor.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * Common class that defines necessary properties to choose and create both
+ * {@link Executor} and {@link Planner}.
+ */
+public class PlannerDescriptor {
 
 Review comment:
   I didn't have any in mind, I just noticed it here  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread

2019-06-26 Thread GitBox
pnowojski commented on a change in pull request #8858: [hotfix][tests] Change 
some StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r297531416
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -1023,25 +1014,63 @@ protected AbstractStateBackend 
createInnerBackend(Configuration config) {
// 

// 

 
+   private enum Event {
+   TASK_IS_RUNNING,
+   }
+
private static class EmptyStreamTask extends StreamTask> {
 
 Review comment:
   I'm not sure. I think that `EmptyStreamTask` is not the best, but also I 
don't think that my proposal are much better. I would lean towards 
`AwaitableStreamTask` or `AwaitableEmptyStreamTask`, but if you don't like it, 
I'm fine with leaving it as it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread

2019-06-26 Thread GitBox
pnowojski commented on a change in pull request #8858: [hotfix][tests] Change 
some StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r297528668
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##
 @@ -805,29 +723,102 @@ public void 
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
new MockEnvironmentBuilder()
.setUserCodeClassLoader(new 
TestUserCodeClassLoader())
.build()) {
-   TimeServiceTask timerServiceTask = new 
TimeServiceTask(mockEnvironment);
+   RunningTask task = runTask(() -> new 
TimeServiceTask(mockEnvironment));
+   task.waitForTaskCompletion(false);
 
-   CompletableFuture invokeFuture = 
CompletableFuture.runAsync(
-   () -> {
-   try {
-   timerServiceTask.invoke();
-   } catch (Exception e) {
-   throw new 
CompletionException(e);
-   }
-   },
-   TestingUtils.defaultExecutor());
-
-   invokeFuture.get();
-
-   assertThat(timerServiceTask.getClassLoaders(), 
hasSize(greaterThanOrEqualTo(1)));
-   assertThat(timerServiceTask.getClassLoaders(), 
everyItem(instanceOf(TestUserCodeClassLoader.class)));
+   assertThat(task.streamTask.getClassLoaders(), 
hasSize(greaterThanOrEqualTo(1)));
+   assertThat(task.streamTask.getClassLoaders(), 
everyItem(instanceOf(TestUserCodeClassLoader.class)));
}
}
 
// 

//  Test Utilities
// 

 
+   private static StreamOperator 
streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) 
throws Exception {
+   StreamOperator operator = mock(StreamOperator.class);
+   when(operator.getOperatorID()).thenReturn(new OperatorID());
 
 Review comment:
   I understand. Regarding `MockStreamOperatorBuilder` we can leave that for 
later. Implementing a proper mock just for this (as private static class in 
this file) should be super quick, but if it's take too much time we can also 
post pone it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   >