[GitHub] [flink] zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#relea
zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#releasedOnConsumption URL: https://github.com/apache/flink/pull/8857#discussion_r297529541 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java ## @@ -65,7 +65,7 @@ private static void testForceConsumptionOnRelease(boolean forceConsumptionOnRele ResultPartitionType.BLOCKING, 1, 0), - NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), + NettyShuffleDescriptorBuilder.newBuilder().setBlocking(true).buildLocal(), Review comment: might be proper for using `partitionType.isBlocking()` instead of direct `true` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12536) Make BufferOrEventSequence#getNext() non-blocking
[ https://issues.apache.org/jira/browse/FLINK-12536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873088#comment-16873088 ] Congxian Qiu(klion26) commented on FLINK-12536: --- As discussed with [~pnowojski] offline, will use a buffer pool(default 2, can be configurable) and reuse the read thread in IOManager to read the BufferOrEvent asynchronous. Create an issue FLINK-12994 to track the improvement for buffer processing performance in SpilledBufferOrEventSequence#getNext. The benchmark result is as following: Benchmark Mode Cnt Score Error Units ConsumerBench.testBuffer thrpt 30 99.528 ± 1.923 ops/ms ConsumerBench.testBufferWithWrap thrpt 30 827.931 ± 15.148 ops/ms ConsumerBench.testCancelCheckpointMarkerEvent thrpt 30 40144.914 ± 1244.112 ops/ms ConsumerBench.testCheckpointBarrierEvent thrpt 30 29784.258 ± 467.440 ops/ms ConsumerBench.testEndOfPartitionEvent thrpt 30 48562.810 ± 1599.328 ops/ms FileIOBench.test1SyncIO thrpt 30 359.367 ± 3.203 ops/ms FileIOBench.testAsyncIO thrpt 30 66.573 ± 3.384 ops/ms [1] `testBuffer` test `SpilledBufferOrEventSequence#getNext` using `MemorySegmentFactory.allocateUnpooledSegment(pageSize)`, `testBufferWithWrap` test `SpilledBufferOrEventSequence#getNext` using `MemorySegmentFactory.wrap()`, `test***Event` test all the event processing, `test1SyncIO` tests the `FileChannel` and `testAsyncIO` tests the `AsynchronousFileChannel` [2] [https://github.com/klion26/FileIOBench/blob/8618fc4d6e745d8dd762b87505102e6cec78dc9b/src/main/java/com/klion26/data/ConsumerBench.java#L130] [3] https://github.com/klion26/FileIOBench/blob/8618fc4d6e745d8dd762b87505102e6cec78dc9b/src/main/java/com/klion26/data/ConsumerBench.java#L169 > Make BufferOrEventSequence#getNext() non-blocking > - > > Key: FLINK-12536 > URL: https://issues.apache.org/jira/browse/FLINK-12536 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently it is non-blocking in case of credit-based flow control (default), > however for \{{SpilledBufferOrEventSequence}} it is blocking on reading from > file. We might want to consider reimplementing it to be non blocking with > {{CompletableFuture isAvailable()}} method. > > Otherwise we will block mailbox processing for the duration of reading from > file - for example we will block processing time timers and potentially in > the future network flushes. > > This is not a high priority change, since it affects non-default > configuration option AND at the moment only processing time timers are > planned to be moved to the mailbox for 1.9. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime URL: https://github.com/apache/flink/pull/8757#discussion_r297546738 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +import java.io.IOException; +import java.io.Serializable; +import java.time.LocalDate; + +/** + * This class can not extend {@link BasicTypeComparator}, because LocalDate is a + * Comparable of ChronoLocalDate instead of Comparable of LocalDate. + */ +@Internal +public final class LocalDateComparator extends TypeComparator implements Serializable { + + private transient LocalDate reference; + + protected final boolean ascendingComparison; + + // For use by getComparators + @SuppressWarnings("rawtypes") + private final LocalDateComparator[] comparators = new LocalDateComparator[] {this}; + + public LocalDateComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(LocalDate value) { + return value.hashCode(); + } + + @Override + public void setReference(LocalDate toCompare) { + this.reference = toCompare; + } + + @Override + public boolean equalToReference(LocalDate candidate) { + return candidate.equals(reference); Review comment: use Objects.equals to avid NPE? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
zentol commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r297555765 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -883,6 +883,17 @@ private void jobStatusChanged( validateRunsInMainThread(); if (newJobStatus.isGloballyTerminalState()) { + // other terminal job states are handled by the executions + if (newJobStatus == JobStatus.FINISHED) { + runAsync(() -> { + for (Map.Entry> entry : registeredTaskManagers.entrySet()) { + Collection storedPartitions = partitionTable.stopTrackingPartitions(entry.getKey()); + // if this call fails TaskExecutors will cleanup partitions regardless once we close the connections + entry.getValue().f1.releasePartitions(jobGraph.getJobID(), storedPartitions); Review comment: ah yes, I got stuck here since the SM requires a ShuffleDescriptor, but so far we've only been tracking ResultPartitionIDs. I have updated the PR to include a PartitionTracker which keeps track of shuffle descriptor, result partition ids etc. and acts as a central point for issuing release calls. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski closed pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
pnowojski closed pull request #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on issue #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder
pnowojski commented on issue #8199: [FLINK-11955] Modify build to move filesystems from lib to plugins folder URL: https://github.com/apache/flink/pull/8199#issuecomment-505795217 > I think there is a value in relocating the fs plugins done together with un-shading them (FLINK-11956). Because in that case, the jars would be not backwards compatible and should be loaded only via plugin manager. This way we can "force" Flink users to properly adjust their Flink setups. But in that case, I'd propose to consider keeping them under some opt/ subdir. This might be still problematic, since it would force all deployments to "load" all file system plugins at the startup, regardless whether they are desired or not. I think this ticket might be invalid. When we were creating it with @StefanRRichter, we didn't expect https://issues.apache.org/jira/browse/FLINK-12143 to basically subsume this. We thought that currently file system jars are being put into the `lib` directory by default by the build system. Since users are expected to move/copy the jars from `opt` to `lib` or `plugins` manually, it might well be that we don't need this (FLINK-11955) PR. I'm closing this PR & Jira ticket for now. We can continue discussing in the Jira ticket and we can always re-open both PR and/or Jira ticket if we decide otherwise. Sorry @yanghua for the confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime URL: https://github.com/apache/flink/pull/8757#discussion_r297571612 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateComparator.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +import java.io.IOException; +import java.io.Serializable; +import java.time.LocalDate; + +/** + * This class can not extend {@link BasicTypeComparator}, because LocalDate is a + * Comparable of ChronoLocalDate instead of Comparable of LocalDate. + */ +@Internal +public final class LocalDateComparator extends TypeComparator implements Serializable { + + private transient LocalDate reference; + + protected final boolean ascendingComparison; + + // For use by getComparators + @SuppressWarnings("rawtypes") + private final LocalDateComparator[] comparators = new LocalDateComparator[] {this}; + + public LocalDateComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(LocalDate value) { + return value.hashCode(); + } + + @Override + public void setReference(LocalDate toCompare) { + this.reference = toCompare; + } + + @Override + public boolean equalToReference(LocalDate candidate) { + return candidate.equals(reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + int comp = ((LocalDateComparator) referencedComparator).reference.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(LocalDate first, LocalDate second) { + int cmp = first.compareTo(second); + return ascendingComparison ? cmp : -cmp; + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(LocalDate record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @SuppressWarnings("rawtypes") + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + @Override + public LocalDate readWithKeyDenormalization(LocalDate reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + return compareSerializedLocalDate(firstSource, secondSource, ascendingComparison); + } + + @Override + public boolean supportsNormalizedKey() { + return true; + } + + @Override + public int getNormalizeKeyLen() { + return 6; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(LocalDate record, MemorySegment target, int offset, int numBytes) { + putNormalizedKeyLocalDate(record, target, offset, numBytes); + } + + @Override + public LocalDateComparator duplicate() { +
[jira] [Closed] (FLINK-12612) Track stored partitions on the TM
[ https://issues.apache.org/jira/browse/FLINK-12612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-12612. Resolution: Fixed master: 847e7e79387b0347015733fdfacb0595e00bcda0 > Track stored partitions on the TM > - > > Key: FLINK-12612 > URL: https://issues.apache.org/jira/browse/FLINK-12612 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination, Runtime / Network >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The TaskExecutor has to track which partitions it has stored, grouped by > JobID. > This is necessary so that the TaskExecutor only disconnects from the > JobManager if it no longer stores relevant (i.e., belonging to the > corresponding job) partitions (similar to the taskSlotTable). > Additionally this is needed so that the TaskExecutor can independently > cleanup partitions for a given job if the connection to the corresponding > JobManager is lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dianfu commented on issue #8715: [FLINK-12821][table][cep] Fix the bug that fix time quantifier can not be the last element of a pattern
dianfu commented on issue #8715: [FLINK-12821][table][cep] Fix the bug that fix time quantifier can not be the last element of a pattern URL: https://github.com/apache/flink/pull/8715#issuecomment-505799482 @dawidwys thanks a lot for the review and commit. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua closed pull request #8684: [FLINK-12790] Implement KeyScope enum to distinguish the key of localKeyBy and general keyBy API
yanghua closed pull request #8684: [FLINK-12790] Implement KeyScope enum to distinguish the key of localKeyBy and general keyBy API URL: https://github.com/apache/flink/pull/8684 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #8892: [FLINK-FLINK-12990][python] Date type doesn't consider the local TimeZone
dianfu commented on issue #8892: [FLINK-FLINK-12990][python] Date type doesn't consider the local TimeZone URL: https://github.com/apache/flink/pull/8892#issuecomment-505802703 @sunjincheng121 Makes sense and I will add some test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on issue #8684: [FLINK-12790] Implement KeyScope enum to distinguish the key of localKeyBy and general keyBy API
yanghua commented on issue #8684: [FLINK-12790] Implement KeyScope enum to distinguish the key of localKeyBy and general keyBy API URL: https://github.com/apache/flink/pull/8684#issuecomment-505802673 @pnowojski I opened this PR before @aljoscha ask me to create FLIP. So I also think it would be better to close it first. In addition, can you continue to join the discussion to give your opinion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12960) Introduce ShuffleDescriptor#ReleaseType
[ https://issues.apache.org/jira/browse/FLINK-12960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin updated FLINK-12960: Summary: Introduce ShuffleDescriptor#ReleaseType (was: Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#releasedOnConsumption) > Introduce ShuffleDescriptor#ReleaseType > --- > > Key: FLINK-12960 > URL: https://issues.apache.org/jira/browse/FLINK-12960 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > {{ResultPartitionDeploymentDescriptor#releasedOnConsumption}} shows the > intention how the partition is going to be used by the shuffle user and > released. The {{ShuffleDescriptor}} should provide a way to query which > release type is supported by shuffle service for this partition. If the > requested release type is not supported by the shuffle service for a certain > type of partition, the job should fail fast. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12960) Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#releasedOnConsumption
[ https://issues.apache.org/jira/browse/FLINK-12960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin updated FLINK-12960: Description: {{ResultPartitionDeploymentDescriptor#releasedOnConsumption}} shows the intention how the partition is going to be used by the shuffle user and released. The {{ShuffleDescriptor}} should provide a way to query which release type is supported by shuffle service for this partition. If the requested release type is not supported by the shuffle service for a certain type of partition, the job should fail fast. (was: ResultPartitionDeploymentDescriptor#releasedOnConsumption shows the intention how the partition is going to be used by the shuffle user. If it is not supported by the shuffle service for a certain type of partition, ShuffleMaster#registerPartitionWithProducer and ShuffleEnvironment#createResultPartitionWriters should throw an exception. ShuffleMaster#registerPartitionWithProducer takes PartitionDescriptor. ResultPartitionDeploymentDescriptor#releasedOnConsumption should be part of PartitionDescriptor so that not only ShuffleEnvironment but also ShuffleMaster is already aware about releasedOnConsumption.) > Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to > PartitionDescriptor#releasedOnConsumption > --- > > Key: FLINK-12960 > URL: https://issues.apache.org/jira/browse/FLINK-12960 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > {{ResultPartitionDeploymentDescriptor#releasedOnConsumption}} shows the > intention how the partition is going to be used by the shuffle user and > released. The {{ShuffleDescriptor}} should provide a way to query which > release type is supported by shuffle service for this partition. If the > requested release type is not supported by the shuffle service for a certain > type of partition, the job should fail fast. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] Add elaborated partition release logic
zhijiangW commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] Add elaborated partition release logic URL: https://github.com/apache/flink/pull/8804#discussion_r297592199 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1630,6 +1627,79 @@ public boolean updateState(TaskExecutionState state) { } } + private boolean updateStateInternal(final TaskExecutionState state, final Execution attempt) { + Map> accumulators; + + switch (state.getExecutionState()) { + case RUNNING: + return attempt.switchToRunning(); + + case FINISHED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFinished(accumulators, state.getIOMetrics()); + return true; + + case CANCELED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.completeCancelling(accumulators, state.getIOMetrics()); + return true; + + case FAILED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics()); + return true; + + default: + // we mark as failed and return false, which triggers the TaskManager + // to remove the task + attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); + return false; + } + } + + private void maybeReleasePartitions(final TaskExecutionState state, final Execution attempt) { + final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID(); + + if (state.getExecutionState() == ExecutionState.FINISHED) { + final List releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex); + releasePartitions(releasablePartitions); Review comment: We might only need `SchedulingTopology` which was already used in below `createResultPartitionId` method. It might be like this in `createResultPartitionId`. ``` SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartition); ResultPartitionID resultPartitonId = schedulingResultPartition.getResultPartitionId(); ``` If we add the `ResultPartitionID` info in the constructor of `DefaultSchedulingResultPartition`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11956) Remove shading from filesystems build
[ https://issues.apache.org/jira/browse/FLINK-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873244#comment-16873244 ] Chesnay Schepler commented on FLINK-11956: -- Are all of our filesystems compatible with the plugin mechanism? Can they still be loaded without the plugin mechanism? > Remove shading from filesystems build > - > > Key: FLINK-11956 > URL: https://issues.apache.org/jira/browse/FLINK-11956 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Stefan Richter >Assignee: vinoyang >Priority: Blocker > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11956) Remove shading from filesystems build
[ https://issues.apache.org/jira/browse/FLINK-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873247#comment-16873247 ] Piotr Nowojski edited comment on FLINK-11956 at 6/26/19 11:40 AM: -- We haven't touched any existing {{FileSystem}} implementation, so yes, they should be loadable without the plugin mechanism. It also should work in the opposite direction. As long as: # there are no un-declared implicit dependencies that were leaking from the user class loader # {{FileSystemFactory}} plugin either do not use {{Thread.currentThread().getContextClassLoader()}} or use it only during {{FileSystem}} creation (\{{org.apache.flink.core.fs.FileSystemFactory#create}}) existing file systems should work just as well with plugins. In other words, plugins vs non-plugins {{FileSystem}} they differ only on the packaging & dependencies leaking level, there are no API changes. One caveat here is this {{getContextClassLoader}} issue. Badly written {{FileSystem}} can be inherently incompatible with plugins. This will be no longer true, once we remove shading. Non shaded {{FileSystems}} should work with plugins by design, but opposite might be no longer true (because of the dependencies clashes). This is why we can not resolve this ticket for 1.9, as this will brake the backwards compatibility. was (Author: pnowojski): We haven't touched any existing {{FileSystem}} implementation, so yes, they should be loadable without the plugin mechanism (we have some end to end tests that are using existing non shaded S3 {{FileSystem}} as plugins). It also should work in the opposite direction. As long as there are not un declared implicit dependencies that were leaking from the user class loader, existing file systems should work just as well with plugins. In other words, plugins vs non-plugins {{FileSystem}} they differ only on the packaging & dependencies leaking level, there are no API changes. This will be no longer true, once we remove shading. Non shaded \{{FileSystems}} should work with plugins by design, but opposite might be no longer true (because of the dependencies clashes). This is why we can not resolve this ticket for 1.9, as this will brake the backwards compatibility. > Remove shading from filesystems build > - > > Key: FLINK-11956 > URL: https://issues.apache.org/jira/browse/FLINK-11956 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Stefan Richter >Assignee: vinoyang >Priority: Blocker > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13001) Add ExecutionGraphBuilder for testing
Chesnay Schepler created FLINK-13001: Summary: Add ExecutionGraphBuilder for testing Key: FLINK-13001 URL: https://issues.apache.org/jira/browse/FLINK-13001 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Tests Affects Versions: 1.9.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.9.0 Creating an ExecutionGraph for testing is quite a hassle since we have no builder for doing so easily. Quite a few classes contain utility methods accepting a subset of the required arguments, which are painful to extend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary
knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary URL: https://github.com/apache/flink/pull/8607#discussion_r297633905 ## File path: docs/concepts/glossary.md ## @@ -0,0 +1,168 @@ +--- +title: Glossary +nav-pos: 3 +nav-title: Glossary +nav-parent_id: concepts +--- + + + Flink Application Cluster + +A Flink Application Cluster is a dedicated [Flink Cluster](./glossary#flink-cluster) that only +executes a single [Flink Job](./glossary#flink-job). The lifetime of the +[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the Flink Job. Formerly +Flink Application Clusters were also known as Flink Clusters in *job mode*. Compare to +[Flink Session Cluster](./glossary#flink-session-cluster). + + Flink Cluster + +The distributed system consisting of (typically) one Flink Master process and one or more Flink +Taskmanagers processes. + + Event + +An event is a statement about a change of the state of the domain modelled by the +application. Events can be input and/or output of a stream or batch processing application. +Events are special types of [records](./glossary#Record). + + ExecutionGraph + +see [Physical Graph](./glossary#physical-graph) + + Function + +Functions are implemented by the user and encapsulate the +application logic of a Flink program. Most Functions are wrapped by a corresponding +[Operator](./glossary#operator). + + Instance + +The term *instance* is used to describe a specific instance of a specific type (usually +[Operator](./glossary#operator) or [Function](./glossary#function)) during runtime. As Apache Flink +is mostly written in Java, this corresponds to the definition of *Instance* or *Object* in Java. +In the context of Apache Flink, the term *parallel instance* is also frequently used to emphasize +that multiple instances of the same [Operator](./glossary#operator) or +[Function](./glossary#function) type are running in parallel. + + Flink Job + +A Flink Job is the runtime representation of a Flink program. A Flink Job can either be submitted +to a long running [Flink Session Cluster](./glossary#flink-session-cluster) or it can be started as a +self-contained [Flink Application Cluster](./glossary#flink-application-cluster). + + JobGraph + +see [Logical Graph](./glossary#logical-graph) + + Flink JobManager + +JobManagers are one of the components running in the +[Flink JobManger Process](./glossary#flink-jobmanager-process). A JobManager is responsible for +supervising the execution of the [Tasks](./glossary#task) of a single job. + + Logical Graph + +A logical graph is a directed graph describing the high-level logic of a stream processing program. +The nodes are [Operators](./glossary#operator) and the edges indicate input/output-relationships or +data streams or data sets. + + Managed State + +Managed State describes application state which has been registered with the framework. For +Managed State, Apache Flink will take care about persistence and rescaling among other things. + + Flink JobManager Process + +The JobManager Process is the master of a [Flink Cluster](./glossary#flink-cluster). It is called +*JobManager* for historical reasons, but actually contains three distinct components: +Flink Resource Manager, Flink Dispatcher and one [Flink JobManager](./glossary#flink-jobmanager) +per running [Flink Job](./glossary#flink-job). + + Operator + +Node of a [Logical Graph](./glossary#logical-graph). An Operator performs a certain operation, +which is usually executed by a [Function](./glossary#function). Sources and Sinks are special +Operators for data ingestion and data egress. + + Operator Chain + +An Operator Chain consists of two or more consecutive [Operators](./glossary#operator) without any +repartitioning in between. Operators within the same Operation Chain forward records to each other +directly without going through serialization or Flink's network stack. + + Partition + +A partition is an independent subset of the overall data stream or data set. A data stream or +data set is divided into partitions by assigning each [record](./glossary#Record) to one or more +partitions. Partitions of data streams or data sets are consumed by [Tasks](./glossary#task) during +runtime. A transformation which changes the way a data stream or data set is partitioned is often +called repartitioning. + + Physical Graph + +A physical graph is the result of translating a [Logical Graph](./glossary#logical-graph) for +execution in a distributed runtime. The nodes are [Tasks](./glossary#task) and the edges indicate +input/output-relationships or [partitions](./glossary#partition) of data streams or data sets. + + Record + +Records are the constituent elements of a data set or data stream. +[Operators](./glossary#operator) and [Functions](./glossary#Function) receive records as input
[jira] [Commented] (FLINK-10879) Align Flink clients on env.execute()
[ https://issues.apache.org/jira/browse/FLINK-10879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873283#comment-16873283 ] TisonKun commented on FLINK-10879: -- [~f.pompermaier] I suspect if you deploy a job cluster ant code after {{env.execute}} would not be executed, too. I'm glad to learn what "REST API" exactly means because in my mind there are 3 approaches for job submission 1. Directly run in IDE 2. Submit via CliFrontend 3. Programming to get ClusterClient and submit JobGraph > Align Flink clients on env.execute() > > > Key: FLINK-10879 > URL: https://issues.apache.org/jira/browse/FLINK-10879 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now the REST APIs do not support any code after env.execute while the > Flink API, CLI client or the code executed within the IDE do. > Both clients should behave in the same way (supporting env.execute() to > return something and continue the code execution or not). > See the discussion on the DEV ML for more details: > http://mail-archives.apache.org/mod_mbox/flink-dev/201811.mbox/%3CCAELUF_DhjzL9FECvx040_GE3d85Ykb-HcGVCh0O4y9h-cThq7A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10879) Align Flink clients on env.execute()
[ https://issues.apache.org/jira/browse/FLINK-10879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873295#comment-16873295 ] TisonKun commented on FLINK-10879: -- Well I can see the "REST API" ways now. It compiles the job via {{OptimizedPlanEnvironment}} which aborted once execute {{#execute}} and thus any code after {{env.execute}} would not be executed. > Align Flink clients on env.execute() > > > Key: FLINK-10879 > URL: https://issues.apache.org/jira/browse/FLINK-10879 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > > Right now the REST APIs do not support any code after env.execute while the > Flink API, CLI client or the code executed within the IDE do. > Both clients should behave in the same way (supporting env.execute() to > return something and continue the code execution or not). > See the discussion on the DEV ML for more details: > http://mail-archives.apache.org/mod_mbox/flink-dev/201811.mbox/%3CCAELUF_DhjzL9FECvx040_GE3d85Ykb-HcGVCh0O4y9h-cThq7A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12023) Override TaskManager memory when submitting a Flink job
[ https://issues.apache.org/jira/browse/FLINK-12023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-12023: Assignee: (was: vinoyang) > Override TaskManager memory when submitting a Flink job > --- > > Key: FLINK-12023 > URL: https://issues.apache.org/jira/browse/FLINK-12023 > Project: Flink > Issue Type: Wish > Components: Runtime / Configuration >Reporter: Michel Davit >Priority: Minor > > Currently a Flink session can only run Task managers of the same size. > However, depending on the jar or even the program arguments, we can have more > intensive job that other. > In order to improve memory usage and avoid resource waste, It would be useful > to have an option that overrides the default task manager memory setting when > submitting a new job. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297531579 ## File path: flink-python/pyflink/find_flink_home.py ## @@ -15,12 +16,21 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import glob import logging import os import sys +def is_flink_home(path): Review comment: is_flink_home -> _is_flink_home This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol opened a new pull request #8896: [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to JM concept
zentol opened a new pull request #8896: [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to JM concept URL: https://github.com/apache/flink/pull/8896 The `forceReleaseOnConsumption` flag allows us to switch between the current behavior (all partitions are released on consumption) and the planned new behavior (only pipelined partitions are released on consumption). This PR moves the processing of this flag from the TM side (where it was handled in the netty shuffle environment) to the JM side (now handled in the Execution when creating the ResultPartitionDeploymentDescriptor). The corresponding config option is evaluated by the `ExecutionGraphBuilder`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297546146 ## File path: flink-dist/src/main/flink-bin/bin/pyflink-shell.sh ## @@ -18,8 +18,13 @@ bin=`dirname "$0"` bin=`cd "$bin"; pwd` +. "$bin"/find-flink-home.sh -. "$bin"/config.sh +_FLINK_HOME_DETERMINED=1 + +cd "$FLINK_HOME"/bin Review comment: Seems that this line can be removed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297526228 ## File path: flink-python/pyflink/shell.py ## @@ -15,7 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import codecs +import io Review comment: io is not used This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297540088 ## File path: flink-python/pyflink/find_flink_home.py ## @@ -1,4 +1,5 @@ - +#!/usr/bin/env python Review comment: Good catch. What about also adding this header for shell.py? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297532944 ## File path: flink-python/pyflink/find_flink_home.py ## @@ -30,12 +40,31 @@ def _find_flink_home(): return os.environ['FLINK_HOME'] else: try: -flink_root_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../") +current_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__))) +flink_root_dir = os.path.abspath(current_dir + "/../../") build_target = flink_root_dir + "/build-target" -pyflink_file = build_target + "/bin/pyflink-gateway-server.sh" -if os.path.isfile(pyflink_file): +if is_flink_home(build_target): os.environ['FLINK_HOME'] = build_target return build_target + +if sys.version < "3": +import imp +try: +module_home = imp.find_module("pyflink")[1] +if is_flink_home(module_home): +os.environ['FLINK_HOME'] = module_home +return module_home +except ImportError: +pass +else: +from importlib.util import find_spec +try: +module_home = os.path.dirname(find_spec("pyflink").origin) Review comment: Most of the code in the if/else is duplicate. We can put only the module_name statement in the if/else to eliminate duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297543943 ## File path: flink-python/MANIFEST.in ## @@ -0,0 +1,29 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +global-exclude *.py[cod] __pycache__ .DS_Store +recursive-include deps/lib *.jar Review comment: What about change these two lines to the following? graft deps/lib *.jar recursive-include deps/opt This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297519065 ## File path: flink-python/setup.py ## @@ -42,31 +44,165 @@ with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') as f: long_description = f.read() -setup( -name='pyflink', -version=VERSION, -packages=['pyflink', - 'pyflink.table', - 'pyflink.util', - 'pyflink.datastream', - 'pyflink.dataset', - 'pyflink.common'], -url='http://flink.apache.org', -license='http://www.apache.org/licenses/LICENSE-2.0', -author='Flink Developers', -author_email='d...@flink.apache.org', -install_requires=['py4j==0.10.8.1'], -tests_require=['pytest==4.4.1'], -description='Apache Flink Python API', -long_description=long_description, -long_description_content_type='text/markdown', -classifiers=[ -'Development Status :: 1 - Planning', -'License :: OSI Approved :: Apache Software License', -'Programming Language :: Python :: 2.7', -'Programming Language :: Python :: 3.3', -'Programming Language :: Python :: 3.4', -'Programming Language :: Python :: 3.5', -'Programming Language :: Python :: 3.6', -'Programming Language :: Python :: 3.7'] -) +TEMP_PATH = "deps" + +LIB_TEMP_PATH = os.path.join(TEMP_PATH, "lib") +OPT_TEMP_PATH = os.path.join(TEMP_PATH, "opt") +CONF_TEMP_PATH = os.path.join(TEMP_PATH, "conf") +EXAMPLES_TEMP_PATH = os.path.join(TEMP_PATH, "examples") +LICENSES_TEMP_PATH = os.path.join(TEMP_PATH, "licenses") +SCRIPTS_TEMP_PATH = os.path.join(TEMP_PATH, "bin") + +LICENSE_FILE_TEMP_PATH = os.path.join("pyflink", "LICENSE") +NOTICE_FILE_TEMP_PATH = os.path.join("pyflink", "NOTICE") +README_FILE_TEMP_PATH = os.path.join("pyflink", "README.txt") + +in_flink_source = os.path.isfile("../flink-java/src/main/java/org/apache/flink/api/java/" + "ExecutionEnvironment.java") + +try: +if in_flink_source: + +try: +os.mkdir(TEMP_PATH) +except: +print("Temp path for symlink to parent already exists {0}".format(TEMP_PATH), + file=sys.stderr) +sys.exit(-1) + +FLINK_HOME = os.path.abspath("../build-target") + +incorrect_invocation_message = """ +If you are installing pyflink from flink source, you must first build Flink and +run sdist. + +To build Flink with maven you can run: + mvn -DskipTests clean package +Building the source dist is done in the flink-python directory: + cd flink-python + python setup.py sdist + pip install dist/*.tar.gz""" + +LIB_PATH = os.path.join(FLINK_HOME, "lib") +OPT_PATH = os.path.join(FLINK_HOME, "opt") +CONF_PATH = os.path.join(FLINK_HOME, "conf") +EXAMPLES_PATH = os.path.join(FLINK_HOME, "examples") +LICENSES_PATH = os.path.join(FLINK_HOME, "licenses") +SCRIPTS_PATH = os.path.join(FLINK_HOME, "bin") + +LICENSE_FILE_PATH = os.path.join(FLINK_HOME, "LICENSE") +NOTICE_FILE_PATH = os.path.join(FLINK_HOME, "NOTICE") +README_FILE_PATH = os.path.join(FLINK_HOME, "README.txt") + +if not os.path.isdir(LIB_PATH): +print(incorrect_invocation_message, file=sys.stderr) +sys.exit(-1) + +if getattr(os, "symlink", None) is not None: +os.symlink(LIB_PATH, LIB_TEMP_PATH) +os.symlink(OPT_PATH, OPT_TEMP_PATH) +os.symlink(CONF_PATH, CONF_TEMP_PATH) +os.symlink(EXAMPLES_PATH, EXAMPLES_TEMP_PATH) +os.symlink(LICENSES_PATH, LICENSES_TEMP_PATH) +os.symlink(SCRIPTS_PATH, SCRIPTS_TEMP_PATH) +os.symlink(LICENSE_FILE_PATH, LICENSE_FILE_TEMP_PATH) +os.symlink(NOTICE_FILE_PATH, NOTICE_FILE_TEMP_PATH) +os.symlink(README_FILE_PATH, README_FILE_TEMP_PATH) +else: +copytree(LIB_PATH, LIB_TEMP_PATH) +copytree(OPT_PATH, OPT_TEMP_PATH) +copytree(CONF_PATH, CONF_TEMP_PATH) +copytree(EXAMPLES_PATH, EXAMPLES_TEMP_PATH) +copytree(LICENSES_PATH, LICENSES_TEMP_PATH) +copytree(SCRIPTS_PATH, SCRIPTS_TEMP_PATH) +copy(LICENSE_FILE_PATH, LICENSE_FILE_TEMP_PATH) +copy(NOTICE_FILE_PATH, NOTICE_FILE_TEMP_PATH) +copy(README_FILE_PATH, README_FILE_TEMP_PATH) +else: +if not os.path.isdir(LIB_TEMP_PATH) or not os.path.isdir(OPT_TEMP_PATH) \ +or not os.path.isdir(SCRIPTS_TEMP_PATH): +print("The flink core files are not found. Please make sure your installation package " + "is complete, or do this in the flink-python directory of the flink source " + "directory.") +sys.exit(-1) + +
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297460497 ## File path: flink-python/setup.py ## @@ -42,31 +44,165 @@ with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') as f: long_description = f.read() -setup( -name='pyflink', -version=VERSION, -packages=['pyflink', - 'pyflink.table', - 'pyflink.util', - 'pyflink.datastream', - 'pyflink.dataset', - 'pyflink.common'], -url='http://flink.apache.org', -license='http://www.apache.org/licenses/LICENSE-2.0', -author='Flink Developers', -author_email='d...@flink.apache.org', -install_requires=['py4j==0.10.8.1'], -tests_require=['pytest==4.4.1'], -description='Apache Flink Python API', -long_description=long_description, -long_description_content_type='text/markdown', -classifiers=[ -'Development Status :: 1 - Planning', -'License :: OSI Approved :: Apache Software License', -'Programming Language :: Python :: 2.7', -'Programming Language :: Python :: 3.3', -'Programming Language :: Python :: 3.4', -'Programming Language :: Python :: 3.5', -'Programming Language :: Python :: 3.6', -'Programming Language :: Python :: 3.7'] -) +TEMP_PATH = "deps" + +LIB_TEMP_PATH = os.path.join(TEMP_PATH, "lib") +OPT_TEMP_PATH = os.path.join(TEMP_PATH, "opt") +CONF_TEMP_PATH = os.path.join(TEMP_PATH, "conf") +EXAMPLES_TEMP_PATH = os.path.join(TEMP_PATH, "examples") +LICENSES_TEMP_PATH = os.path.join(TEMP_PATH, "licenses") +SCRIPTS_TEMP_PATH = os.path.join(TEMP_PATH, "bin") Review comment: Should we also consider the directory `plugins` in build-target? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
dianfu commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297550056 ## File path: flink-dist/src/main/flink-bin/bin/config.sh ## @@ -296,7 +296,10 @@ bin=`dirname "$target"` SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` # Define the main directory of the flink installation -FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"` +# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), use the _PYFLINK_HOME +if [ -z "$_FLINK_HOME_DETERMINED" ]; then Review comment: In case that `FLINK_HOME` is already determined, `FLINK_HOME` will be set. So we can check if `FLINK_HOME` is already set and there is no need to add variable `_FLINK_HOME_DETERMINED`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12953) View logs from Job view in Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-12953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873121#comment-16873121 ] Robert Metzger commented on FLINK-12953: [~chadrik] thanks a lot for opening this ticket! I agree with you that it would be a great improvement to Flink's UX. Does Cloud Dataflow only show custom log messages (from the user code), or also system log messages from a transformation? If it is only custom log messages: are they providing a logger instance that the user has to use? > View logs from Job view in Web Dashboard > > > Key: FLINK-12953 > URL: https://issues.apache.org/jira/browse/FLINK-12953 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Chad Dombrova >Priority: Major > > As a (beam) developer I want to be able to print/log information from my > custom transforms, and then monitor that output within the job view of the > Web Dashboard, so that I don't have to go hunting through the combined log in > the Job Manager view. The Job Manager log has way too much in it, spanning > all jobs, including output logged by both flink and user code. > A good example of how this UX should work can be found in Google Dataflow: > - click on a job, and see the logged output for that job > - click on a transform, and see the logged output for just that transform > thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12786) Implement local aggregation in Flink
[ https://issues.apache.org/jira/browse/FLINK-12786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873162#comment-16873162 ] vinoyang commented on FLINK-12786: -- Now, the main discussers in the community are me and the developers from Alibaba. It seems that we have major differences at the API level. We hope that [~aljoscha] and [~StephanEwen] can give more professional advice. Maybe just give directions. It seems that the current discussion will be inefficient. This wastes a lot of time. Maybe a lot of work could have been done in parallel. > Implement local aggregation in Flink > > > Key: FLINK-12786 > URL: https://issues.apache.org/jira/browse/FLINK-12786 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Currently, keyed streams are widely used to perform aggregating operations > (e.g., reduce, sum and window) on the elements that have the same key. When > executed at runtime, the elements with the same key will be sent to and > aggregated by the same task. > > The performance of these aggregating operations is very sensitive to the > distribution of keys. In the cases where the distribution of keys follows a > powerful law, the performance will be significantly downgraded. More > unluckily, increasing the degree of parallelism does not help when a task is > overloaded by a single key. > > Local aggregation is a widely-adopted method to reduce the performance > degraded by data skew. We can decompose the aggregating operations into two > phases. In the first phase, we aggregate the elements of the same key at the > sender side to obtain partial results. Then at the second phase, these > partial results are sent to receivers according to their keys and are > combined to obtain the final result. Since the number of partial results > received by each receiver is limited by the number of senders, the imbalance > among receivers can be reduced. Besides, by reducing the amount of > transferred data the performance can be further improved. > The design documentation is here: > [https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing] > The discussion thread is here: > [http://mail-archives.apache.org/mod_mbox/flink-dev/201906.mbox/%3CCAA_=o7dvtv8zjcxknxyoyy7y_ktvgexrvb4zhxjwzuhsulz...@mail.gmail.com%3E] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r297579509 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -82,7 +89,10 @@ public void open(FileSystem fs, Path path) throws IOException { public void write(T element) throws IOException { FSDataOutputStream outputStream = getStream(); outputStream.write(element.toString().getBytes(charset)); - outputStream.write('\n'); + if (rowDelimiterBytes == null) { + rowDelimiterBytes = rowDelimiter.getBytes(charset); Review comment: Why are you "caching" `rowDelimiterBytes`? Isn't this a premature optimisation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol merged pull request #8857: [FLINK-12960] Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes
zentol merged pull request #8857: [FLINK-12960] Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes URL: https://github.com/apache/flink/pull/8857 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zhijiangW commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#discussion_r297581901 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ## @@ -71,7 +71,7 @@ public void beforeTest() { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Review comment: we could add try catch and ignore the exception here. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r297581797 ## File path: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java ## @@ -32,7 +32,7 @@ @Test public void testDuplicate() { Review comment: Please also add a unit test (in this file?) to cover that the custom delimiter is actually used somehow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter
pnowojski commented on a change in pull request #8621: [FLINK-12682][connectors] StringWriter support custom row delimiter URL: https://github.com/apache/flink/pull/8621#discussion_r297580610 ## File path: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ## @@ -54,13 +59,15 @@ public StringWriter() { * * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)} */ - public StringWriter(String charsetName) { + public StringWriter(String charsetName, String rowDelimiter) { Review comment: please update the `@param` java doc. Secondly, I'm not sure, but isn't `StringWriter` a part of public API and shouldn't we preserve the old constructor? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary
knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary URL: https://github.com/apache/flink/pull/8607#discussion_r297636397 ## File path: docs/concepts/glossary.md ## @@ -0,0 +1,168 @@ +--- +title: Glossary +nav-pos: 3 +nav-title: Glossary +nav-parent_id: concepts +--- + + + Flink Application Cluster + +A Flink Application Cluster is a dedicated [Flink Cluster](./glossary#flink-cluster) that only +executes a single [Flink Job](./glossary#flink-job). The lifetime of the +[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the Flink Job. Formerly +Flink Application Clusters were also known as Flink Clusters in *job mode*. Compare to +[Flink Session Cluster](./glossary#flink-session-cluster). + + Flink Cluster + +The distributed system consisting of (typically) one Flink Master process and one or more Flink +Taskmanagers processes. + + Event + +An event is a statement about a change of the state of the domain modelled by the +application. Events can be input and/or output of a stream or batch processing application. +Events are special types of [records](./glossary#Record). Review comment: In Domain Driven Design, Event-Driven Architectures, etc. this is exactly, what an event is. It is always an event (=state change) in the domain. I could add a text book example like "checkout" or "item added to cart". Would this help? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leesf opened a new pull request #8899: [hotfix][readme] fix how to contribute to Apache Flink url
leesf opened a new pull request #8899: [hotfix][readme] fix how to contribute to Apache Flink url URL: https://github.com/apache/flink/pull/8899 *Fix url error in README.md* cc @zentol @fhueske This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#relea
zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#releasedOnConsumption URL: https://github.com/apache/flink/pull/8857#discussion_r297530334 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java ## @@ -73,19 +74,26 @@ public NettyShuffleDescriptorBuilder setConnectionIndex(int connectionIndex) { return this; } + public NettyShuffleDescriptorBuilder setBlocking(boolean blocking) { Review comment: `setIsBlocking(boolean isBlocking)` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11767) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11767: Assignee: (was: Tzu-Li (Gordon) Tai) > Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8 > - > > Key: FLINK-11767 > URL: https://issues.apache.org/jira/browse/FLINK-11767 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Priority: Blocker > Fix For: 1.9.0 > > > Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover > restoring from Flink 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11767) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873067#comment-16873067 ] vinoyang commented on FLINK-11767: -- I have released the assignee. > Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8 > - > > Key: FLINK-11767 > URL: https://issues.apache.org/jira/browse/FLINK-11767 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Priority: Blocker > Fix For: 1.9.0 > > > Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover > restoring from Flink 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11767) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8
[ https://issues.apache.org/jira/browse/FLINK-11767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11767: Assignee: Tzu-Li (Gordon) Tai (was: vinoyang) > Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.8 > - > > Key: FLINK-11767 > URL: https://issues.apache.org/jira/browse/FLINK-11767 > Project: Flink > Issue Type: Sub-task > Components: API / Type Serialization System, Tests >Affects Versions: 1.8.0 >Reporter: vinoyang >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.9.0 > > > Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover > restoring from Flink 1.8. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] ifndef-SleePy opened a new pull request #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter
ifndef-SleePy opened a new pull request #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter URL: https://github.com/apache/flink/pull/8894 ## What is the purpose of the change * Expose an internal method `execute(StreamGraph)` of `StreamExecutionEnvironment`. So the pluggable runner could get a chance to set properties of `StreamGraph`. ## Brief change log * Add a new abstract method `execute(StreamGraph)` of `StreamExecutionEnvironment` * Make `execute(jobName)` of `StreamExecutionEnvironment` as an implementation since all subclasses have same logic ## Verifying this change * This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12961) StreamExecutionEnvironment supports executing job with StreamGraph
[ https://issues.apache.org/jira/browse/FLINK-12961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12961: --- Labels: pull-request-available (was: ) > StreamExecutionEnvironment supports executing job with StreamGraph > -- > > Key: FLINK-12961 > URL: https://issues.apache.org/jira/browse/FLINK-12961 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Biao Liu >Assignee: Biao Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > Expose an internal method {{execute(StreamGraph)}} of > {{StreamExecutionEnvironment}}. So the pluggable runner could get a chance to > set properties of {{StreamGraph}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297538465 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new
[GitHub] [flink] zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297567190 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,641 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static
[jira] [Closed] (FLINK-11955) Modify build to move filesystems from lib to plugins folder
[ https://issues.apache.org/jira/browse/FLINK-11955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-11955. -- Resolution: Invalid Users were always expected to move the file system jars manually from `opt` to `lib`, so we can expect them to move them manually from `opt` to `plugins` as well. Because of that I'm closing the ticket. For the details, please check out the [discussion in the pull request|https://github.com/apache/flink/pull/8199#issuecomment-500820126]. > Modify build to move filesystems from lib to plugins folder > --- > > Key: FLINK-11955 > URL: https://issues.apache.org/jira/browse/FLINK-11955 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Stefan Richter >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot edited a comment on issue #8893: [hotfix][python] Align the signature of type utility methods with Java
flinkbot edited a comment on issue #8893: [hotfix][python] Align the signature of type utility methods with Java URL: https://github.com/apache/flink/pull/8893#issuecomment-505713502 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8893: [hotfix][python] Align the signature of type utility methods with Java
sunjincheng121 commented on issue #8893: [hotfix][python] Align the signature of type utility methods with Java URL: https://github.com/apache/flink/pull/8893#issuecomment-505801756 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] alpinegizmo commented on issue #8607: [FLINK-12652] [documentation] add first version of a glossary
alpinegizmo commented on issue #8607: [FLINK-12652] [documentation] add first version of a glossary URL: https://github.com/apache/flink/pull/8607#issuecomment-505807000 For a first version, this looks quite good already. Are we collecting somewhere a list of terms that might be added at some point? Job scheduling and back pressure come to mind as examples. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
liyafan82 commented on a change in pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime URL: https://github.com/apache/flink/pull/8757#discussion_r297586451 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LocalDateTimeSerializer.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +@Internal +public final class LocalDateTimeSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + public static final LocalDateTimeSerializer INSTANCE = new LocalDateTimeSerializer(); + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public LocalDateTime createInstance() { + return LocalDateTime.of( + LocalDateSerializer.INSTANCE.createInstance(), + LocalTimeSerializer.INSTANCE.createInstance()); + } + + @Override + public LocalDateTime copy(LocalDateTime from) { + return from; + } + + @Override + public LocalDateTime copy(LocalDateTime from, LocalDateTime reuse) { + return from; + } + + @Override + public int getLength() { + return LocalDateSerializer.INSTANCE.getLength() + LocalTimeSerializer.INSTANCE.getLength(); + } + + @Override + public void serialize(LocalDateTime record, DataOutputView target) throws IOException { + if (record == null) { + LocalDateSerializer.INSTANCE.serialize(null, target); + LocalTimeSerializer.INSTANCE.serialize(null, target); + } else { + LocalDateSerializer.INSTANCE.serialize(record.toLocalDate(), target); + LocalTimeSerializer.INSTANCE.serialize(record.toLocalTime(), target); + } + } + + @Override + public LocalDateTime deserialize(DataInputView source) throws IOException { + LocalDate localDate = LocalDateSerializer.INSTANCE.deserialize(source); + LocalTime localTime = LocalTimeSerializer.INSTANCE.deserialize(source); + if (localDate == null && localTime == null) { + return null; + } else if (localDate == null || localTime == null) { + throw new IOException("LocalDate and LocalTime are either null or not null together."); Review comment: Exactly one of LocalDate and LocalTime is null. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots.
gaoyunhaii commented on a change in pull request #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots. URL: https://github.com/apache/flink/pull/8846#discussion_r297586457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ## @@ -128,6 +130,20 @@ /** Resource profiles of slots that can be acquired. */ private final HashSet availableSlotResourceProfiles = new HashSet<>(); + /** +* All allocated slots's allocation resource profile. Review comment: slots's -> slots', or The resource profiles of all allocated slots This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots.
gaoyunhaii commented on a change in pull request #8846: [FLINK-12766][runtime] Dynamically allocate TaskExecutor's managed memory to slots. URL: https://github.com/apache/flink/pull/8846#discussion_r297586457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ## @@ -128,6 +130,20 @@ /** Resource profiles of slots that can be acquired. */ private final HashSet availableSlotResourceProfiles = new HashSet<>(); + /** +* All allocated slots's allocation resource profile. Review comment: slots's -> slots' This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread
1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread URL: https://github.com/apache/flink/pull/8858#discussion_r297592011 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -1023,25 +1014,63 @@ protected AbstractStateBackend createInnerBackend(Configuration config) { // // + private enum Event { + TASK_IS_RUNNING, + } + private static class EmptyStreamTask extends StreamTask> { - public EmptyStreamTask(Environment env) { + private boolean isRunningFoo; + private final LinkedBlockingQueue eventQueue; + private final OperatorChain> overrideOperatorChain; + private volatile boolean sourceFinished; + + EmptyStreamTask(Environment env, LinkedBlockingQueue eventQueue, OperatorChain> operatorChain) { super(env, null); + this.eventQueue = eventQueue; + this.overrideOperatorChain = operatorChain; } @Override - protected void init() throws Exception {} + protected void init() { + if (overrideOperatorChain != null) { + super.operatorChain = this.overrideOperatorChain; + super.headOperator = super.operatorChain.getHeadOperator(); + } + } @Override - protected void performDefaultAction(ActionContext context) throws Exception { - context.allActionsCompleted(); + protected void performDefaultAction(ActionContext context) { Review comment: I have changed the way how the tests are waiting for task to be running. That way it should not conflict with the future changes in your PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] Add elaborated partition release logic
zhijiangW commented on a change in pull request #8804: [FLINK-12883][WIP][runtime] Add elaborated partition release logic URL: https://github.com/apache/flink/pull/8804#discussion_r297592199 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1630,6 +1627,79 @@ public boolean updateState(TaskExecutionState state) { } } + private boolean updateStateInternal(final TaskExecutionState state, final Execution attempt) { + Map> accumulators; + + switch (state.getExecutionState()) { + case RUNNING: + return attempt.switchToRunning(); + + case FINISHED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFinished(accumulators, state.getIOMetrics()); + return true; + + case CANCELED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.completeCancelling(accumulators, state.getIOMetrics()); + return true; + + case FAILED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics()); + return true; + + default: + // we mark as failed and return false, which triggers the TaskManager + // to remove the task + attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); + return false; + } + } + + private void maybeReleasePartitions(final TaskExecutionState state, final Execution attempt) { + final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID(); + + if (state.getExecutionState() == ExecutionState.FINISHED) { + final List releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex); + releasePartitions(releasablePartitions); Review comment: We might only need `SchedulingTopology` which was already used in below `createResultPartitionId` method. It might be like this in `createResultPartitionId`. ` SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartition); ResultPartitionID resultPartitonId = schedulingResultPartition.getResultPartitionId(); ` If we add the `ResultPartitionID` info in the constructor of `DefaultSchedulingResultPartition`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11956) Remove shading from filesystems build
[ https://issues.apache.org/jira/browse/FLINK-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-11956: --- Priority: Blocker (was: Major) > Remove shading from filesystems build > - > > Key: FLINK-11956 > URL: https://issues.apache.org/jira/browse/FLINK-11956 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Stefan Richter >Assignee: vinoyang >Priority: Blocker > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11956) Remove shading from filesystems build
[ https://issues.apache.org/jira/browse/FLINK-11956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873211#comment-16873211 ] Piotr Nowojski commented on FLINK-11956: I think we should decide how do we want to proceed with this one and I see two options: # Treat plugins as an experimental feature in 1.9, deprecate non plugin {{FileSystems}} in 1.10 so we can remove the shading (this ticket) in 1.11 # Deprecate non plugin {{FileSystems}} in 1.9 and already encourage users to switch to plugins so we can remove the shading in 1.10 I'm marking this as a blocker for 1.9 release to resolve this discussion before the actual release. > Remove shading from filesystems build > - > > Key: FLINK-11956 > URL: https://issues.apache.org/jira/browse/FLINK-11956 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Affects Versions: 1.9.0 >Reporter: Stefan Richter >Assignee: vinoyang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] 1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread
1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread URL: https://github.com/apache/flink/pull/8858#discussion_r29760 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -1023,25 +1014,63 @@ protected AbstractStateBackend createInnerBackend(Configuration config) { // // + private enum Event { + TASK_IS_RUNNING, + } + private static class EmptyStreamTask extends StreamTask> { Review comment: I have renamed to `MockStreamTask`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread
1u0 commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread URL: https://github.com/apache/flink/pull/8858#discussion_r297609906 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -805,29 +723,102 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable new MockEnvironmentBuilder() .setUserCodeClassLoader(new TestUserCodeClassLoader()) .build()) { - TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment); + RunningTask task = runTask(() -> new TimeServiceTask(mockEnvironment)); + task.waitForTaskCompletion(false); - CompletableFuture invokeFuture = CompletableFuture.runAsync( - () -> { - try { - timerServiceTask.invoke(); - } catch (Exception e) { - throw new CompletionException(e); - } - }, - TestingUtils.defaultExecutor()); - - invokeFuture.get(); - - assertThat(timerServiceTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1))); - assertThat(timerServiceTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class))); + assertThat(task.streamTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1))); + assertThat(task.streamTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class))); } } // // Test Utilities // + private static StreamOperator streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) throws Exception { + StreamOperator operator = mock(StreamOperator.class); + when(operator.getOperatorID()).thenReturn(new OperatorID()); + + when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))) + .thenReturn(operatorSnapshotResult); + + return operator; + } + + private static StreamOperator streamOperatorWithSnapshotException(Exception exception) throws Exception { + StreamOperator operator = mock(StreamOperator.class); + when(operator.getOperatorID()).thenReturn(new OperatorID()); + + when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))) + .thenThrow(exception); + + return operator; + } + + private static OperatorChain> operatorChain(StreamOperator... streamOperators) { + OperatorChain> operatorChain = mock(OperatorChain.class); + when(operatorChain.getAllOperators()).thenReturn(streamOperators); + return operatorChain; + } + + private static class RunningTask { + final T streamTask; + final CompletableFuture invocationFuture; + + RunningTask(T streamTask, CompletableFuture invocationFuture) { + this.streamTask = streamTask; + this.invocationFuture = invocationFuture; + } + + void waitForTaskCompletion(boolean cancelled) throws Exception { + if (!cancelled) { + invocationFuture.get(); + return; + } + try { + invocationFuture.get(); + } catch (Exception e) { + assertThat(e.getCause(), is(instanceOf(CancelTaskException.class))); Review comment: In this case, the `CancelTaskException` is the original exception that thrown by `task.invoke()`. It's not expected to be wrapped by more exceptions. The only wrapping exception comes from the `invocationFuture`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary
knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary URL: https://github.com/apache/flink/pull/8607#discussion_r297632384 ## File path: docs/concepts/glossary.md ## @@ -0,0 +1,168 @@ +--- +title: Glossary +nav-pos: 3 +nav-title: Glossary +nav-parent_id: concepts +--- + + + Flink Application Cluster + +A Flink Application Cluster is a dedicated [Flink Cluster](./glossary#flink-cluster) that only +executes a single [Flink Job](./glossary#flink-job). The lifetime of the +[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the Flink Job. Formerly +Flink Application Clusters were also known as Flink Clusters in *job mode*. Compare to +[Flink Session Cluster](./glossary#flink-session-cluster). + + Flink Cluster + +The distributed system consisting of (typically) one Flink Master process and one or more Flink +Taskmanagers processes. Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#discussion_r297632721 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ## @@ -71,7 +71,7 @@ public void beforeTest() { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Review comment: most of the test were failing anyway if the ioManager shutdown failed due to subsequent assertions on `isProperlyShutdown`; thus b) is no longer a new problem. I would suggest to go through all tests, remove the `isProperlyShutDown` call if the test follows the same pattern, and potentially remove the method altogether if it is unused at that point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary
knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary URL: https://github.com/apache/flink/pull/8607#discussion_r297632771 ## File path: docs/concepts/glossary.md ## @@ -0,0 +1,168 @@ +--- +title: Glossary +nav-pos: 3 +nav-title: Glossary +nav-parent_id: concepts +--- + + + Flink Application Cluster + +A Flink Application Cluster is a dedicated [Flink Cluster](./glossary#flink-cluster) that only +executes a single [Flink Job](./glossary#flink-job). The lifetime of the +[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the Flink Job. Formerly +Flink Application Clusters were also known as Flink Clusters in *job mode*. Compare to +[Flink Session Cluster](./glossary#flink-session-cluster). + + Flink Cluster + +The distributed system consisting of (typically) one Flink Master process and one or more Flink +Taskmanagers processes. + + Event + +An event is a statement about a change of the state of the domain modelled by the +application. Events can be input and/or output of a stream or batch processing application. +Events are special types of [records](./glossary#Record). + + ExecutionGraph + +see [Physical Graph](./glossary#physical-graph) + + Function + +Functions are implemented by the user and encapsulate the +application logic of a Flink program. Most Functions are wrapped by a corresponding +[Operator](./glossary#operator). + + Instance + +The term *instance* is used to describe a specific instance of a specific type (usually +[Operator](./glossary#operator) or [Function](./glossary#function)) during runtime. As Apache Flink +is mostly written in Java, this corresponds to the definition of *Instance* or *Object* in Java. +In the context of Apache Flink, the term *parallel instance* is also frequently used to emphasize +that multiple instances of the same [Operator](./glossary#operator) or +[Function](./glossary#function) type are running in parallel. + + Flink Job + +A Flink Job is the runtime representation of a Flink program. A Flink Job can either be submitted +to a long running [Flink Session Cluster](./glossary#flink-session-cluster) or it can be started as a +self-contained [Flink Application Cluster](./glossary#flink-application-cluster). + + JobGraph + +see [Logical Graph](./glossary#logical-graph) + + Flink JobManager + +JobManagers are one of the components running in the +[Flink JobManger Process](./glossary#flink-jobmanager-process). A JobManager is responsible for +supervising the execution of the [Tasks](./glossary#task) of a single job. + + Logical Graph + +A logical graph is a directed graph describing the high-level logic of a stream processing program. +The nodes are [Operators](./glossary#operator) and the edges indicate input/output-relationships or +data streams or data sets. + + Managed State + +Managed State describes application state which has been registered with the framework. For +Managed State, Apache Flink will take care about persistence and rescaling among other things. + + Flink JobManager Process + +The JobManager Process is the master of a [Flink Cluster](./glossary#flink-cluster). It is called +*JobManager* for historical reasons, but actually contains three distinct components: +Flink Resource Manager, Flink Dispatcher and one [Flink JobManager](./glossary#flink-jobmanager) +per running [Flink Job](./glossary#flink-job). + + Operator + +Node of a [Logical Graph](./glossary#logical-graph). An Operator performs a certain operation, +which is usually executed by a [Function](./glossary#function). Sources and Sinks are special +Operators for data ingestion and data egress. + + Operator Chain + +An Operator Chain consists of two or more consecutive [Operators](./glossary#operator) without any +repartitioning in between. Operators within the same Operation Chain forward records to each other +directly without going through serialization or Flink's network stack. + + Partition + +A partition is an independent subset of the overall data stream or data set. A data stream or +data set is divided into partitions by assigning each [record](./glossary#Record) to one or more +partitions. Partitions of data streams or data sets are consumed by [Tasks](./glossary#task) during +runtime. A transformation which changes the way a data stream or data set is partitioned is often +called repartitioning. + + Physical Graph + +A physical graph is the result of translating a [Logical Graph](./glossary#logical-graph) for +execution in a distributed runtime. The nodes are [Tasks](./glossary#task) and the edges indicate +input/output-relationships or [partitions](./glossary#partition) of data streams or data sets. + + Record + +Records are the constituent elements of a data set or data stream. +[Operators](./glossary#operator) and [Functions](./glossary#Function) receive records as input
[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary
knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary URL: https://github.com/apache/flink/pull/8607#discussion_r297632737 ## File path: docs/concepts/glossary.md ## @@ -0,0 +1,168 @@ +--- +title: Glossary +nav-pos: 3 +nav-title: Glossary +nav-parent_id: concepts +--- + + + Flink Application Cluster + +A Flink Application Cluster is a dedicated [Flink Cluster](./glossary#flink-cluster) that only +executes a single [Flink Job](./glossary#flink-job). The lifetime of the +[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the Flink Job. Formerly +Flink Application Clusters were also known as Flink Clusters in *job mode*. Compare to +[Flink Session Cluster](./glossary#flink-session-cluster). + + Flink Cluster + +The distributed system consisting of (typically) one Flink Master process and one or more Flink +Taskmanagers processes. + + Event + +An event is a statement about a change of the state of the domain modelled by the +application. Events can be input and/or output of a stream or batch processing application. +Events are special types of [records](./glossary#Record). + + ExecutionGraph + +see [Physical Graph](./glossary#physical-graph) + + Function + +Functions are implemented by the user and encapsulate the +application logic of a Flink program. Most Functions are wrapped by a corresponding +[Operator](./glossary#operator). + + Instance + +The term *instance* is used to describe a specific instance of a specific type (usually +[Operator](./glossary#operator) or [Function](./glossary#function)) during runtime. As Apache Flink +is mostly written in Java, this corresponds to the definition of *Instance* or *Object* in Java. +In the context of Apache Flink, the term *parallel instance* is also frequently used to emphasize +that multiple instances of the same [Operator](./glossary#operator) or +[Function](./glossary#function) type are running in parallel. + + Flink Job + +A Flink Job is the runtime representation of a Flink program. A Flink Job can either be submitted +to a long running [Flink Session Cluster](./glossary#flink-session-cluster) or it can be started as a +self-contained [Flink Application Cluster](./glossary#flink-application-cluster). + + JobGraph + +see [Logical Graph](./glossary#logical-graph) + + Flink JobManager + +JobManagers are one of the components running in the +[Flink JobManger Process](./glossary#flink-jobmanager-process). A JobManager is responsible for Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12914) Remove legacy InstanceListener
[ https://issues.apache.org/jira/browse/FLINK-12914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-12914. Resolution: Fixed master: 126a489e20f4743c361bbf2f2b4a2c890f8026ab > Remove legacy InstanceListener > -- > > Key: FLINK-12914 > URL: https://issues.apache.org/jira/browse/FLINK-12914 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: leesf >Assignee: leesf >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] 1u0 commented on a change in pull request #8826: [FLINK-12479][operators] Integrate StreamInputProcessor(s) with mailbox
1u0 commented on a change in pull request #8826: [FLINK-12479][operators] Integrate StreamInputProcessor(s) with mailbox URL: https://github.com/apache/flink/pull/8826#discussion_r297625182 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ## @@ -18,213 +18,16 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; -import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; -import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; -import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; -import org.apache.flink.streaming.runtime.tasks.OperatorChain; -import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.runtime.io.AvailabilityListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; +import java.io.Closeable; /** - * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. - * - * This internally uses a {@link StatusWatermarkValve} to keep track of {@link Watermark} and - * {@link StreamStatus} events, and forwards them to event subscribers once the - * {@link StatusWatermarkValve} determines the {@link Watermark} from all inputs has advanced, or - * that a {@link StreamStatus} needs to be propagated downstream to denote a status change. - * - * Forwarding elements, watermarks, or status status elements must be protected by synchronizing - * on the given lock object. This ensures that we don't call methods on a - * {@link OneInputStreamOperator} concurrently with the timer callback or other things. - * - * @param The type of the record that can be read with this record reader. + * Interface for processing records by StreamTask. */ -@Internal -public class StreamInputProcessor { - - private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class); - - private final StreamTaskInput input; - - private final Object lock; - - private final OperatorChain operatorChain; - - // Status and Watermark Valve -- - - /** Valve that controls how watermarks and stream statuses are forwarded. */ - private StatusWatermarkValve statusWatermarkValve; - - private final StreamStatusMaintainer streamStatusMaintainer; - - private final OneInputStreamOperator streamOperator; - - // Metrics -- - - private final WatermarkGauge watermarkGauge; - private Counter numRecordsIn; - - @SuppressWarnings("unchecked") - public StreamInputProcessor( - InputGate[] inputGates, - TypeSerializer inputSerializer, - StreamTask checkpointedTask, - CheckpointingMode checkpointMode, - Object lock, - IOManager ioManager, - Configuration taskManagerConfig, - StreamStatusMaintainer streamStatusMaintainer, - OneInputStreamOperator streamOperator, - TaskIOMetricGroup metrics, - WatermarkGauge watermarkGauge, - String taskName, - OperatorChain operatorChain) throws IOException { - - InputGate inputGate = InputGateUtil.createInputGate(inputGates); - - CheckpointBarrierHandler barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( - checkpointedTask, - checkpointMode, - ioManager, - inputGate, - taskManagerConfig, - taskName); - this.input = new StreamTaskNetworkInput(barrierHandler, inputSerializer, ioManager, 0); - -
[jira] [Resolved] (FLINK-12957) Fix thrift and protobuf dependency examples in documentation
[ https://issues.apache.org/jira/browse/FLINK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber resolved FLINK-12957. - Resolution: Fixed Fix Version/s: 1.8.1 1.7.3 merged into: - 1.8: 9ad7cda7a145537e2968416a361e0d22d85828e6 - 1.7: 5b8154c3c6c60ca8a850d82856db84d4334bc327 > Fix thrift and protobuf dependency examples in documentation > > > Key: FLINK-12957 > URL: https://issues.apache.org/jira/browse/FLINK-12957 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.7.2, 1.8.0, 1.9.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.3, 1.8.1, 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The examples in the docs are not up-to-date anymore and should be updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12957) Fix thrift and protobuf dependency examples in documentation
[ https://issues.apache.org/jira/browse/FLINK-12957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber closed FLINK-12957. --- > Fix thrift and protobuf dependency examples in documentation > > > Key: FLINK-12957 > URL: https://issues.apache.org/jira/browse/FLINK-12957 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.7.2, 1.8.0, 1.9.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.3, 1.8.1, 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The examples in the docs are not up-to-date anymore and should be updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#relea
zhijiangW commented on a change in pull request #8857: [FLINK-12960][coordination][shuffle] Move ResultPartitionDeploymentDescriptor#releasedOnConsumption to PartitionDescriptor#releasedOnConsumption URL: https://github.com/apache/flink/pull/8857#discussion_r297532281 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java ## @@ -103,6 +137,9 @@ public boolean sendScheduleOrUpdateConsumersMessage() { * by {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} * and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}. * +* The partition has to support the corresponding {@link ReleaseType} in Review comment: I think it might be better to indicate which value true/false should support which `releaseType`, otherwise it seems not clear the mapping between this value and release type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5018) Make source idle timeout user configurable
[ https://issues.apache.org/jira/browse/FLINK-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873065#comment-16873065 ] vinoyang commented on FLINK-5018: - If we do not try to take effort on this issue and open this config option. I think we can close it. What do you think? [~aljoscha] [~tzulitai] > Make source idle timeout user configurable > -- > > Key: FLINK-5018 > URL: https://issues.apache.org/jira/browse/FLINK-5018 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Tzu-Li (Gordon) Tai >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > There are 2 cases where sources are considered idle and should emit an idle > {{StreamStatus}} downstream, taking Kafka consumer as example: > - The source instance was not assigned any partitions > - The source instance was assigned partitions, but they currently don't have > any data. > For the second case, we can only consider it idle after a timeout threshold. > It would be good to make this timeout user configurable besides a default > value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12971) Remove the constraint that lookup join needs a primary key or index key
[ https://issues.apache.org/jira/browse/FLINK-12971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12971. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: cb922bbfa0a26e147638d22aeceaf3e83b01e1d3 > Remove the constraint that lookup join needs a primary key or index key > --- > > Key: FLINK-12971 > URL: https://issues.apache.org/jira/browse/FLINK-12971 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently, we add a constraint in dimension table lookup join that the lookup > fields must be a primary key or index key. This is not a logic constraint but > a performance constraint. Because if there are no indexes on the lookup key, > the lookup performance will be poor. > We will remove this constraint because not every table have a primary key or > indexes (e.g. Hive tables). It's the user's responsibility if the lookup > fields are not keys and get a bad performance. In this case, users should add > indexes on these fields. In the future, we can also propagate out these > tuning information before SQL is executed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 opened a new pull request #8895: [FLINK-12536][coordinator]Make BufferOrEventSequence#getNext non blocking
klion26 opened a new pull request #8895: [FLINK-12536][coordinator]Make BufferOrEventSequence#getNext non blocking URL: https://github.com/apache/flink/pull/8895 ## What is the purpose of the change Currently it is non-blocking in case of credit-based flow control (default), however for SpilledBufferOrEventSequence it is blocking on reading from file. We might want to consider reimplementing it to be non blocking with CompletableFuture isAvailable() method. Otherwise we will block mailbox processing for the duration of reading from file - for example we will block processing time timers and potentially in the future network flushes. ## Brief change log Use a ByteBuffer pool to read the BufferOrEvent asynchronous. - the pool size if default 2, can be configurated by key `taskmanager.memory.async-load.buffer-count` - will reuse the read thread in `IOManager` ## Verifying this change This change is already covered by existing tests, such as: - `SpilledBufferOrEventSequenceTest.java` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12536) Make BufferOrEventSequence#getNext() non-blocking
[ https://issues.apache.org/jira/browse/FLINK-12536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12536: --- Labels: pull-request-available (was: ) > Make BufferOrEventSequence#getNext() non-blocking > - > > Key: FLINK-12536 > URL: https://issues.apache.org/jira/browse/FLINK-12536 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > > Currently it is non-blocking in case of credit-based flow control (default), > however for \{{SpilledBufferOrEventSequence}} it is blocking on reading from > file. We might want to consider reimplementing it to be non blocking with > {{CompletableFuture isAvailable()}} method. > > Otherwise we will block mailbox processing for the duration of reading from > file - for example we will block processing time timers and potentially in > the future network flushes. > > This is not a high priority change, since it affects non-default > configuration option AND at the moment only processing time timers are > planned to be moved to the mailbox for 1.9. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] GJL commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler
GJL commented on a change in pull request #8851: [FLINK-12876] [runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8851#discussion_r297556096 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,641 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PendingCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + @ClassRule + public static
[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r297561629 ## File path: flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java ## @@ -67,54 +65,109 @@ public void testDontOverwriteExecutionMode() { } @Test - public void configuredJobIDTakesPrecedenceWithHA() { - Optional jobID = Optional.of(JobID.generate()); + public void configuredJobIdTakesPrecedenceWithHA() { Review comment: Not required here, but imo, those tests that check precedence order would be better if they are rewritten as one test that checks all cases as table input. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12821) Fix the bug that fix time quantifier can not be the last element of a pattern
[ https://issues.apache.org/jira/browse/FLINK-12821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-12821. Resolution: Fixed Fix Version/s: 1.9.0 Implemented in 09a4a3ad522fd95cacc4598f69edb6965b7151b5 > Fix the bug that fix time quantifier can not be the last element of a pattern > - > > Key: FLINK-12821 > URL: https://issues.apache.org/jira/browse/FLINK-12821 > Project: Flink > Issue Type: Sub-task > Components: Library / CEP, Table SQL / API >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently, exception "Greedy quantifiers are not allowed as the last element > of a Pattern yet. Finish your pattern with either a simple variable or > reluctant quantifier." will be thrown for patterns such as "a\{2}". Actually > greedy property is not meaningful for this kind of pattern. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r297555970 ## File path: flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java ## @@ -67,54 +65,109 @@ public void testDontOverwriteExecutionMode() { } @Test - public void configuredJobIDTakesPrecedenceWithHA() { - Optional jobID = Optional.of(JobID.generate()); + public void configuredJobIdTakesPrecedenceWithHA() { + JobID jobId = JobID.generate(); + String jobIdSeed = null; + + Configuration globalConfiguration = new Configuration(); + enableHighAvailability(globalConfiguration); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobId, + jobIdSeed, + globalConfiguration); + + assertThat(jobIdForCluster, is(jobId)); + } + + @Test + public void configuredJobIdTakesPrecedenceWithoutHA() { + JobID jobId = JobID.generate(); + String jobIdSeed = null; + + Configuration globalConfiguration = new Configuration(); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobId, + jobIdSeed, + globalConfiguration); + + assertThat(jobIdForCluster, is(jobId)); + } + + @Test + public void configuredJobIdSeedTakesPrecedenceWithoutHA() { + JobID jobId = null; + String jobIdSeed = "some-seed"; + + Configuration globalConfiguration = new Configuration(); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobId, + jobIdSeed, + globalConfiguration); + + assertThat(jobIdForCluster, is(JobID.fromSeed(jobIdSeed))); + } + + @Test + public void configuredJobIdSeedTakesPrecedenceWithHA() { + JobID jobId = null; + String jobIdSeed = "some-seed"; Configuration globalConfiguration = new Configuration(); enableHighAvailability(globalConfiguration); JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, + jobId, + jobIdSeed, globalConfiguration); - assertThat(jobIdForCluster, is(jobID.get())); + assertThat(jobIdForCluster, is(JobID.fromSeed(jobIdSeed))); } @Test - public void configuredJobIDTakesPrecedenceWithoutHA() { - Optional jobID = Optional.of(JobID.generate()); + public void configuredJobIdTakesPrecedenceOverJobIdSeed() { + JobID jobId = JobID.generate(); + String jobIdSeed = "some-seed"; Configuration globalConfiguration = new Configuration(); + enableHighAvailability(globalConfiguration); JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, + jobId, + jobIdSeed, globalConfiguration); - assertThat(jobIdForCluster, is(jobID.get())); + assertThat(jobIdForCluster, is(jobId)); } @Test - public void jobIDdefaultsToZeroWithHA() { - Optional jobID = Optional.empty(); + public void jobIdDefaultsToZeroWithHA() { + JobID jobId = null; + String jobIdSeed = null; Configuration globalConfiguration = new Configuration(); enableHighAvailability(globalConfiguration); JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, + jobId, + jobIdSeed, globalConfiguration); assertThat(jobIdForCluster, is(ZERO_JOB_ID)); } @Test - public void jobIDdefaultsToRandomJobIDWithoutHA() { - Optional jobID = Optional.empty(); + public void jobIdDefaultsToRandomJobIDWithoutHA() { Review comment: Rename `jobIdDefaultsToRandomJobIDWithoutHA` to `jobIdDefaultsToRandomJobIdWithoutHA`, to finish getting rid of `ID` in the method/variables names in this file? **Side note:** not required here, but imo, those tests that check precedence order would be better if they are rewritten as one test that checks all cases as table input.
[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r297561629 ## File path: flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java ## @@ -67,54 +65,109 @@ public void testDontOverwriteExecutionMode() { } @Test - public void configuredJobIDTakesPrecedenceWithHA() { - Optional jobID = Optional.of(JobID.generate()); + public void configuredJobIdTakesPrecedenceWithHA() { Review comment: Not required here, but imo, those tests that check precedence order would be better if they are rewritten as one test that checks all cases as table input. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r297553510 ## File path: flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh ## @@ -64,14 +64,14 @@ fi eval $(minikube docker-env) cd "$DOCKER_MODULE_DIR" -./build.sh --from-local-dist --job-jar ${FLINK_DIR}/examples/batch/WordCount.jar --image-name ${FLINK_IMAGE_NAME} +./build.sh --from-local-dist --job-artifacts ${FLINK_DIR}/examples/batch/WordCount.jar --image-name ${FLINK_IMAGE_NAME} cd "$END_TO_END_DIR" -kubectl create -f ${KUBERNETES_MODULE_DIR}/job-cluster-service.yaml -envsubst '${FLINK_IMAGE_NAME} ${FLINK_JOB} ${FLINK_JOB_PARALLELISM} ${FLINK_JOB_ARGUMENTS}' < ${CONTAINER_SCRIPTS}/job-cluster-job.yaml.template | kubectl create -f - -envsubst '${FLINK_IMAGE_NAME} ${FLINK_JOB_PARALLELISM}' < ${CONTAINER_SCRIPTS}/task-manager-deployment.yaml.template | kubectl create -f - -kubectl wait --for=condition=complete job/flink-job-cluster --timeout=1h -kubectl cp `kubectl get pods | awk '/task-manager/ {print $1}'`:/cache/${OUTPUT_FILE} ${OUTPUT_VOLUME}/${OUTPUT_FILE} +envsubst '${FLINK_APPLICATION_NAME}' < ${KUBERNETES_MODULE_DIR}/job-cluster-service.yaml.template | kubectl apply -f - +envsubst '${FLINK_APPLICATION_NAME} ${FLINK_IMAGE_NAME} ${FLINK_JOB} ${FLINK_JOB_PARALLELISM} ${FLINK_JOB_ARGUMENTS}' < ${KUBERNETES_MODULE_DIR}/job-cluster-job.yaml.template | kubectl apply -f - +envsubst '${FLINK_APPLICATION_NAME} ${FLINK_IMAGE_NAME} ${FLINK_JOB_PARALLELISM}' < ${KUBERNETES_MODULE_DIR}/task-manager-deployment.yaml.template | kubectl apply -f - Review comment: Personally, I'd stick with `kubectl create ...` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r297555970 ## File path: flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java ## @@ -67,54 +65,109 @@ public void testDontOverwriteExecutionMode() { } @Test - public void configuredJobIDTakesPrecedenceWithHA() { - Optional jobID = Optional.of(JobID.generate()); + public void configuredJobIdTakesPrecedenceWithHA() { + JobID jobId = JobID.generate(); + String jobIdSeed = null; + + Configuration globalConfiguration = new Configuration(); + enableHighAvailability(globalConfiguration); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobId, + jobIdSeed, + globalConfiguration); + + assertThat(jobIdForCluster, is(jobId)); + } + + @Test + public void configuredJobIdTakesPrecedenceWithoutHA() { + JobID jobId = JobID.generate(); + String jobIdSeed = null; + + Configuration globalConfiguration = new Configuration(); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobId, + jobIdSeed, + globalConfiguration); + + assertThat(jobIdForCluster, is(jobId)); + } + + @Test + public void configuredJobIdSeedTakesPrecedenceWithoutHA() { + JobID jobId = null; + String jobIdSeed = "some-seed"; + + Configuration globalConfiguration = new Configuration(); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobId, + jobIdSeed, + globalConfiguration); + + assertThat(jobIdForCluster, is(JobID.fromSeed(jobIdSeed))); + } + + @Test + public void configuredJobIdSeedTakesPrecedenceWithHA() { + JobID jobId = null; + String jobIdSeed = "some-seed"; Configuration globalConfiguration = new Configuration(); enableHighAvailability(globalConfiguration); JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, + jobId, + jobIdSeed, globalConfiguration); - assertThat(jobIdForCluster, is(jobID.get())); + assertThat(jobIdForCluster, is(JobID.fromSeed(jobIdSeed))); } @Test - public void configuredJobIDTakesPrecedenceWithoutHA() { - Optional jobID = Optional.of(JobID.generate()); + public void configuredJobIdTakesPrecedenceOverJobIdSeed() { + JobID jobId = JobID.generate(); + String jobIdSeed = "some-seed"; Configuration globalConfiguration = new Configuration(); + enableHighAvailability(globalConfiguration); JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, + jobId, + jobIdSeed, globalConfiguration); - assertThat(jobIdForCluster, is(jobID.get())); + assertThat(jobIdForCluster, is(jobId)); } @Test - public void jobIDdefaultsToZeroWithHA() { - Optional jobID = Optional.empty(); + public void jobIdDefaultsToZeroWithHA() { + JobID jobId = null; + String jobIdSeed = null; Configuration globalConfiguration = new Configuration(); enableHighAvailability(globalConfiguration); JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( - jobID, + jobId, + jobIdSeed, globalConfiguration); assertThat(jobIdForCluster, is(ZERO_JOB_ID)); } @Test - public void jobIDdefaultsToRandomJobIDWithoutHA() { - Optional jobID = Optional.empty(); + public void jobIdDefaultsToRandomJobIDWithoutHA() { Review comment: Rename `jobIdDefaultsToRandomJobIDWithoutHA` to `jobIdDefaultsToRandomJobIdWithoutHA`, to finish getting rid of `ID` in the method/variables names in this file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r297534362 ## File path: flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java ## @@ -92,9 +105,23 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma restPort, savepointRestoreSettings, jobId, + jobIdSeed, jobClassName); } + private String getJobIdSeed(@Nonnull final CommandLine commandLine) throws FlinkParseException { + + boolean isJobIdSeedConfigured = commandLine.hasOption(JOB_ID_SEED_OPTION.getOpt()); + boolean isJobIdConfigured = commandLine.hasOption(JOB_ID_OPTION.getOpt()); + + if (isJobIdSeedConfigured && isJobIdConfigured) { Review comment: The ticket description says that both options (`--job-id` and `--job--id-seed`) can be passed and they will be resolved by precedence order. I think here you are making them mutually exclusive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r297534771 ## File path: flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java ## @@ -60,13 +60,25 @@ .desc("Job ID of the job to run.") .build(); + private static final Option JOB_ID_SEED_OPTION = Option.builder("jids") + .longOpt("job-id-seed") Review comment: What do you think about an alternative and more explicit name, like `--random-job-id-seed`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys merged pull request #8715: [FLINK-12821][table][cep] Fix the bug that fix time quantifier can not be the last element of a pattern
dawidwys merged pull request #8715: [FLINK-12821][table][cep] Fix the bug that fix time quantifier can not be the last element of a pattern URL: https://github.com/apache/flink/pull/8715 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
1u0 commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r297541840 ## File path: flink-core/src/main/java/org/apache/flink/api/common/JobID.java ## @@ -64,27 +65,40 @@ public JobID(long lowerPart, long upperPart) { public JobID(byte[] bytes) { super(bytes); } - + // // Static factory methods // /** * Creates a new (statistically) random JobID. -* +* * @return A new random JobID. */ public static JobID generate() { return new JobID(); } + /** +* Creates a new JobID based on the given seed. The JobIDs returned by two invocations of this +* method with the same seed will be equal. +* +* @param seed the seed to base the generation of the JobID +* +* @return A new JobID based on the given seed. +*/ + public static JobID fromSeed(String seed) { + Random rnd = new Random(seed.hashCode()); Review comment: In case of HA mode, do I get it right that it's desirable, that different Flink jobs must have different IDs? I think `String.hashCode()` can be too weak source of random seed to generate such job IDs. Maybe consider some better [hash functions](https://docs.oracle.com/javase/8/docs/api/java/security/MessageDigest.html) instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#discussion_r297569347 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java ## @@ -149,16 +150,15 @@ private void createSubpartitions( } } - private static void initializeBoundedBlockingPartitions( + private void initializeBoundedBlockingPartitions( ResultSubpartition[] subpartitions, - ResultPartition parent, - IOManager ioManager) { Review comment: couldn't you pass in the channelManager instead and keep it static? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#discussion_r297572793 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java ## @@ -99,8 +93,8 @@ ResultSubpartition createFailingWritesSubpartition() throws Exception { // - static Path tmpPath() throws IOException { - return new File(TMP_DIR.newFolder(), "subpartition").toPath(); + static Path tmpPath() { + return fileChannelManager.createChannel().getPathFile().toPath(); Review comment: This commit is strictly speaking not necessary, correct? If so, I'd like to exclude it to prevent merge conflict with #8880. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zentol commented on a change in pull request #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#discussion_r297567048 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ## @@ -71,7 +71,7 @@ public void beforeTest() { } @After - public void afterTest() { + public void afterTest() throws Exception { this.ioManager.close(); if (!this.ioManager.isProperlyShutDown()) { Review comment: looking at this it makes it seem like ioManager.close should never throw exceptions. We can already see downsides of this change since, if close() throws an exception, a) isProperlyShutDown is never called b) the memory manager would not be shutdown This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-505800856 @Xeli Yeah, let's do that. I can give a final pass on the patch after you merge the latest changes. @rmetzger what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10862) REST API does not show program descriptions of "simple" ProgramDescription
[ https://issues.apache.org/jira/browse/FLINK-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873164#comment-16873164 ] TisonKun commented on FLINK-10862: -- Also I find it useless {{Program}} is. How can users defined how to return a {{Plan}}? I highly suspect whether there is any user implementing their job via {{Program}} interface. Otherwise we can deprecate or remove it for a rational codebase. > REST API does not show program descriptions of "simple" ProgramDescription > -- > > Key: FLINK-10862 > URL: https://issues.apache.org/jira/browse/FLINK-10862 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.6.2 >Reporter: Flavio Pompermaier >Priority: Major > Labels: rest_api > > When uploading a jar containing a main class implementing ProgramDescription > interface, the REST API doesn't list its description. It works only if the > class implements Program (that I find pretty useless...why should I return > the plan?) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12960) Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes
[ https://issues.apache.org/jira/browse/FLINK-12960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin updated FLINK-12960: Summary: Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes (was: Introduce ShuffleDescriptor#ReleaseType) > Introduce ShuffleDescriptor#ReleaseType and > ShuffleDescriptor#getSupportedReleaseTypes > -- > > Key: FLINK-12960 > URL: https://issues.apache.org/jira/browse/FLINK-12960 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > {{ResultPartitionDeploymentDescriptor#releasedOnConsumption}} shows the > intention how the partition is going to be used by the shuffle user and > released. The {{ShuffleDescriptor}} should provide a way to query which > release type is supported by shuffle service for this partition. If the > requested release type is not supported by the shuffle service for a certain > type of partition, the job should fail fast. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
zentol commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r297603449 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.partition.PartitionTable; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toList; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTracker { Review comment: should introduce an interface for this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12998) Document Plugins mechanism
[ https://issues.apache.org/jira/browse/FLINK-12998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-12998: --- Description: Plugins mechanism must be documented before the release. We should write down: * benefits * how to use existing FileSystem plugins * how to implement an own custom FileSystem plugin * potential issues of relaying on {{Thread.currentThread().getContextClassLoader()}} (currently it's set only for {{FileSystemFactory}} class loading and {{FileSystemFactory#create()}} method call - if a {{FileSystem}} is accessing {{getContextClassLoader}} during runtime (write/reading) it will not work properly as a plugin. was: Plugins mechanism must be documented before the release. We should write down: * benefits * how to use existing FileSystem plugins * how to implement an own custom FileSystem plugin * potential issues of relaying on {{Thread.currentThread().getContextClassLoader()}} > Document Plugins mechanism > -- > > Key: FLINK-12998 > URL: https://issues.apache.org/jira/browse/FLINK-12998 > Project: Flink > Issue Type: Sub-task > Components: FileSystems >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Priority: Blocker > Fix For: 1.9.0 > > > Plugins mechanism must be documented before the release. We should write down: > * benefits > * how to use existing FileSystem plugins > * how to implement an own custom FileSystem plugin > * potential issues of relaying on > {{Thread.currentThread().getContextClassLoader()}} (currently it's set only > for {{FileSystemFactory}} class loading and {{FileSystemFactory#create()}} > method call - if a {{FileSystem}} is accessing {{getContextClassLoader}} > during runtime (write/reading) it will not work properly as a plugin. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary
knaufk commented on a change in pull request #8607: [FLINK-12652] [documentation] add first version of a glossary URL: https://github.com/apache/flink/pull/8607#discussion_r297635084 ## File path: docs/concepts/glossary.md ## @@ -0,0 +1,168 @@ +--- +title: Glossary +nav-pos: 3 +nav-title: Glossary +nav-parent_id: concepts +--- + + + Flink Application Cluster + +A Flink Application Cluster is a dedicated [Flink Cluster](./glossary#flink-cluster) that only +executes a single [Flink Job](./glossary#flink-job). The lifetime of the +[Flink Cluster](./glossary#flink-cluster) is bound to the lifetime of the Flink Job. Formerly +Flink Application Clusters were also known as Flink Clusters in *job mode*. Compare to +[Flink Session Cluster](./glossary#flink-session-cluster). + + Flink Cluster + +The distributed system consisting of (typically) one Flink Master process and one or more Flink +Taskmanagers processes. + + Event + +An event is a statement about a change of the state of the domain modelled by the +application. Events can be input and/or output of a stream or batch processing application. +Events are special types of [records](./glossary#Record). + + ExecutionGraph + +see [Physical Graph](./glossary#physical-graph) + + Function + +Functions are implemented by the user and encapsulate the +application logic of a Flink program. Most Functions are wrapped by a corresponding +[Operator](./glossary#operator). + + Instance + +The term *instance* is used to describe a specific instance of a specific type (usually +[Operator](./glossary#operator) or [Function](./glossary#function)) during runtime. As Apache Flink +is mostly written in Java, this corresponds to the definition of *Instance* or *Object* in Java. +In the context of Apache Flink, the term *parallel instance* is also frequently used to emphasize +that multiple instances of the same [Operator](./glossary#operator) or +[Function](./glossary#function) type are running in parallel. + + Flink Job + +A Flink Job is the runtime representation of a Flink program. A Flink Job can either be submitted +to a long running [Flink Session Cluster](./glossary#flink-session-cluster) or it can be started as a +self-contained [Flink Application Cluster](./glossary#flink-application-cluster). + + JobGraph + +see [Logical Graph](./glossary#logical-graph) + + Flink JobManager + +JobManagers are one of the components running in the +[Flink JobManger Process](./glossary#flink-jobmanager-process). A JobManager is responsible for +supervising the execution of the [Tasks](./glossary#task) of a single job. + + Logical Graph + +A logical graph is a directed graph describing the high-level logic of a stream processing program. +The nodes are [Operators](./glossary#operator) and the edges indicate input/output-relationships or +data streams or data sets. + + Managed State + +Managed State describes application state which has been registered with the framework. For +Managed State, Apache Flink will take care about persistence and rescaling among other things. + + Flink JobManager Process + +The JobManager Process is the master of a [Flink Cluster](./glossary#flink-cluster). It is called +*JobManager* for historical reasons, but actually contains three distinct components: +Flink Resource Manager, Flink Dispatcher and one [Flink JobManager](./glossary#flink-jobmanager) +per running [Flink Job](./glossary#flink-job). + + Operator + +Node of a [Logical Graph](./glossary#logical-graph). An Operator performs a certain operation, +which is usually executed by a [Function](./glossary#function). Sources and Sinks are special +Operators for data ingestion and data egress. + + Operator Chain + +An Operator Chain consists of two or more consecutive [Operators](./glossary#operator) without any +repartitioning in between. Operators within the same Operation Chain forward records to each other +directly without going through serialization or Flink's network stack. + + Partition + +A partition is an independent subset of the overall data stream or data set. A data stream or +data set is divided into partitions by assigning each [record](./glossary#Record) to one or more +partitions. Partitions of data streams or data sets are consumed by [Tasks](./glossary#task) during +runtime. A transformation which changes the way a data stream or data set is partitioned is often +called repartitioning. + + Physical Graph + +A physical graph is the result of translating a [Logical Graph](./glossary#logical-graph) for +execution in a distributed runtime. The nodes are [Tasks](./glossary#task) and the edges indicate +input/output-relationships or [partitions](./glossary#partition) of data streams or data sets. + + Record + +Records are the constituent elements of a data set or data stream. +[Operators](./glossary#operator) and [Functions](./glossary#Function) receive records as input
[GitHub] [flink] azagrebin commented on a change in pull request #8896: [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to JM concept
azagrebin commented on a change in pull request #8896: [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to JM concept URL: https://github.com/apache/flink/pull/8896#discussion_r297633733 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java ## @@ -39,23 +40,22 @@ public class ResultPartitionFactoryTest extends TestLogger { @Test - public void testForceConsumptionOnReleaseEnabled() { - testForceConsumptionOnRelease(true); + public void testConsumptionOnReleaseEnabled() { + testConsumptionOnRelease(true); Review comment: if `testConsumptionOnRelease` returned `ResultPartition`, you could do the assertion here. Also `testConsumptionOnRelease ` could directly accept the `ReleaseType` then there would be less ifs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12176) Unify JobGraph creation in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-12176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873287#comment-16873287 ] TisonKun commented on FLINK-12176: -- [~till.rohrmann] I can see your concern now because {{ContextExecutionEnvironment}} really execute all commands inside main method of user program, while {{OptimizerPlanEnvironment}} abort control. I am going to draft a design doc on separating job compilation, cluster deployment and job submission, in which I will discuss this "unification of job compilation" things closer. > Unify JobGraph creation in CliFrontend > -- > > Key: FLINK-12176 > URL: https://issues.apache.org/jira/browse/FLINK-12176 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Affects Versions: 1.9.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Attachments: patch.diff > > > Currently, we create {{JobGraph}} by the following process > * if the cluster start in job mode, we create {{JobGraph}} by > {{PackagedProgramUtils#createJobGraph}} and deploy a job cluster > * if the cluster start in session mode, we create {{JobGraph}} and submit it > by {{CliFrontend#executeProgram}}, which internally the same as above but > using {{ContextEnvironment}} instead of {{OptimizerPlanEnvironment}}. > {{ContextEnvironment}} not only create the job graph but also submit it. > However, the processes of {{JobGraph}} creation in job mode and session mode > are similar. That means, we can unify the process by always create > {{JobGraph}} by {{PackagedProgramUtils#createJobGraph}}. And, > * in job mode, deploy job cluster with the {{JobGraph}} > * in session mode, submit the {{JobGraph}} to the session cluster > From a higher view, it is helpful for a common view of job submission in both > job and session mode and give opportunities to refactor legacy client codes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aljoscha commented on a change in pull request #8852: [FLINK-12798][table-api][table-planner] Add a proper discover mechanism that will enable switching between Flink & Blink Planner/
aljoscha commented on a change in pull request #8852: [FLINK-12798][table-api][table-planner] Add a proper discover mechanism that will enable switching between Flink & Blink Planner/Executor URL: https://github.com/apache/flink/pull/8852#discussion_r297529895 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/PlannerDescriptor.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.Planner; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** + * Common class that defines necessary properties to choose and create both + * {@link Executor} and {@link Planner}. + */ +public class PlannerDescriptor { Review comment: I didn't have any in mind, I just noticed it here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread
pnowojski commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread URL: https://github.com/apache/flink/pull/8858#discussion_r297531416 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -1023,25 +1014,63 @@ protected AbstractStateBackend createInnerBackend(Configuration config) { // // + private enum Event { + TASK_IS_RUNNING, + } + private static class EmptyStreamTask extends StreamTask> { Review comment: I'm not sure. I think that `EmptyStreamTask` is not the best, but also I don't think that my proposal are much better. I would lean towards `AwaitableStreamTask` or `AwaitableEmptyStreamTask`, but if you don't like it, I'm fine with leaving it as it is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread
pnowojski commented on a change in pull request #8858: [hotfix][tests] Change some StreamTask tests to create a test task in the task's thread URL: https://github.com/apache/flink/pull/8858#discussion_r297528668 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ## @@ -805,29 +723,102 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable new MockEnvironmentBuilder() .setUserCodeClassLoader(new TestUserCodeClassLoader()) .build()) { - TimeServiceTask timerServiceTask = new TimeServiceTask(mockEnvironment); + RunningTask task = runTask(() -> new TimeServiceTask(mockEnvironment)); + task.waitForTaskCompletion(false); - CompletableFuture invokeFuture = CompletableFuture.runAsync( - () -> { - try { - timerServiceTask.invoke(); - } catch (Exception e) { - throw new CompletionException(e); - } - }, - TestingUtils.defaultExecutor()); - - invokeFuture.get(); - - assertThat(timerServiceTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1))); - assertThat(timerServiceTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class))); + assertThat(task.streamTask.getClassLoaders(), hasSize(greaterThanOrEqualTo(1))); + assertThat(task.streamTask.getClassLoaders(), everyItem(instanceOf(TestUserCodeClassLoader.class))); } } // // Test Utilities // + private static StreamOperator streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) throws Exception { + StreamOperator operator = mock(StreamOperator.class); + when(operator.getOperatorID()).thenReturn(new OperatorID()); Review comment: I understand. Regarding `MockStreamOperatorBuilder` we can leave that for later. Implementing a proper mock just for this (as private static class in this file) should be super quick, but if it's take too much time we can also post pone it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services