[jira] [Assigned] (FLINK-11608) Translate the "Local Setup Tutorial" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shengjk1 reassigned FLINK-11608: Assignee: shengjk1 > Translate the "Local Setup Tutorial" page into Chinese > -- > > Key: FLINK-11608 > URL: https://issues.apache.org/jira/browse/FLINK-11608 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: shengjk1 >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html > The markdown file is located in flink/docs/tutorials/local_setup.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r256719361 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendBuilder.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.IOException; +import java.util.Collection; + +/** + * A mutable builder to build a state backend instance + * + * @param The type of the state backend instance + * @param The type of the state handle + */ +public interface StateBackendBuilder { + T build() throws IOException; + + StateBackendBuilder setRestoreStateHandles(Collection restoreStateHandles); Review comment: Let me make `stateHandles` parameter in builder constructor and remove the setter from interface. And I agree that we could just use `SupplierWithException` but this will force the method name to be `get` instead of `build` and make it hard to detect the caller of builder. So I suggest to still create the builder interface. What's your opinion? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11159) Allow configuration whether to fall back to savepoints for restore
[ https://issues.apache.org/jira/browse/FLINK-11159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767938#comment-16767938 ] vinoyang commented on FLINK-11159: -- Just finished my vacation and saw an improved design document named "Terminate/Suspend Job with Savepoint" on the mailing list, which comes from [~kkl0u] . Before I provide design documentation for this issue, I hope someone can tell me that this idea is valuable? > Allow configuration whether to fall back to savepoints for restore > -- > > Key: FLINK-11159 > URL: https://issues.apache.org/jira/browse/FLINK-11159 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nico Kruber >Assignee: vinoyang >Priority: Major > > Ever since FLINK-3397, upon failure, Flink would restart from the latest > checkpoint/savepoint which ever is more recent. With the introduction of > local recovery and the knowledge that a RocksDB checkpoint restore would just > copy the files, it may be time to re-consider / making this configurable: > In certain situations, it may be faster to restore from the latest checkpoint > only (even if there is a more recent savepoint) and reprocess the data > between. On the downside, though, that may not be correct because that might > break side effects if the savepoint was the latest one, e.g. consider this > chain: {{chk1 -> chk2 -> sp … restore chk2 -> …}}. Then all side effects > between {{chk2 -> sp}} would be reproduced. > Making this configurable will allow the user to set whatever he needs / can > to get the lowest recovery time in Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r256718752 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation; +import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestorePrepareResult; +import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase; +import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy; +import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; +import org.apache.flink.util.StateMigrationException; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; + +import static org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME; + +/** + * Builder class for {@link RocksDBKeyedStateBackend} which handles all necessary initializations and clean ups. + * + * @param The data type that the key serializer serializes. + */ +public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBackendBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class); + public static final String DB_INSTANCE_DIR_STRING = "db"; + + /** String that identifies the operator that owns this backend. */ + private final String operatorIdentifier; + private final RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType; + /** The configuration of local recovery. */ + private final LocalRecoveryConfig localRecoveryConfig; + + //-- + + /** The column family options from the options factory. */ + private final ColumnFamilyOptions columnFamilyOptions; + + /** The DB options from the options factory. */ + private final DBOptions dbOptions; + + /** Path where this configured instance stores
[jira] [Assigned] (FLINK-11607) Translate the "DataStream API Tutorial" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li reassigned FLINK-11607: -- Assignee: Benchao Li > Translate the "DataStream API Tutorial" page into Chinese > - > > Key: FLINK-11607 > URL: https://issues.apache.org/jira/browse/FLINK-11607 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Benchao Li >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/tutorials/datastream_api.html > The markdown file is located in flink/docs/tutorials/datastream_api.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese
Jark Wu created FLINK-11614: --- Summary: Translate the "Configuring Dependencies" page into Chinese Key: FLINK-11614 URL: https://issues.apache.org/jira/browse/FLINK-11614 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11605) Translate the "Dataflow Programming Model" page into Chinese
Jark Wu created FLINK-11605: --- Summary: Translate the "Dataflow Programming Model" page into Chinese Key: FLINK-11605 URL: https://issues.apache.org/jira/browse/FLINK-11605 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/concepts/programming-model.html The markdown file is located in flink/docs/concepts/programming-model.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11610) Translate the "Examples" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shenlei reassigned FLINK-11610: --- Assignee: shenlei > Translate the "Examples" page into Chinese > -- > > Key: FLINK-11610 > URL: https://issues.apache.org/jira/browse/FLINK-11610 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: shenlei >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/examples/ > The markdown file is located in flink/docs/examples/index.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11605) Translate the "Dataflow Programming Model" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xueyu reassigned FLINK-11605: - Assignee: xueyu > Translate the "Dataflow Programming Model" page into Chinese > > > Key: FLINK-11605 > URL: https://issues.apache.org/jira/browse/FLINK-11605 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: xueyu >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/concepts/programming-model.html > The markdown file is located in flink/docs/concepts/programming-model.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11613) Translate the "Project Template for Scala" page into Chinese
Jark Wu created FLINK-11613: --- Summary: Translate the "Project Template for Scala" page into Chinese Key: FLINK-11613 URL: https://issues.apache.org/jira/browse/FLINK-11613 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/scala_api_quickstart.html The markdown file is located in flink/docs/dev/projectsetup/scala_api_quickstart.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11608) Translate the "Local Setup Tutorial" page into Chinese
Jark Wu created FLINK-11608: --- Summary: Translate the "Local Setup Tutorial" page into Chinese Key: FLINK-11608 URL: https://issues.apache.org/jira/browse/FLINK-11608 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html The markdown file is located in flink/docs/concepts/local_setup.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11612) Translate the "Project Template for Java" page into Chinese
Jark Wu created FLINK-11612: --- Summary: Translate the "Project Template for Java" page into Chinese Key: FLINK-11612 URL: https://issues.apache.org/jira/browse/FLINK-11612 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/java_api_quickstart.html The markdown file is located in flink/docs/dev/projectsetup/java_api_quickstart.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11611) Translate the "Batch Examples" page into Chinese
Jark Wu created FLINK-11611: --- Summary: Translate the "Batch Examples" page into Chinese Key: FLINK-11611 URL: https://issues.apache.org/jira/browse/FLINK-11611 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/examples.html The markdown file is located in flink/docs/dev/batch/examples.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11607) Translate the "DataStream API Tutorial" page into Chinese
Jark Wu created FLINK-11607: --- Summary: Translate the "DataStream API Tutorial" page into Chinese Key: FLINK-11607 URL: https://issues.apache.org/jira/browse/FLINK-11607 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/tutorials/datastream_api.html The markdown file is located in flink/docs/tutorials/datastream_api.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11610) Translate the "Examples" page into Chinese
Jark Wu created FLINK-11610: --- Summary: Translate the "Examples" page into Chinese Key: FLINK-11610 URL: https://issues.apache.org/jira/browse/FLINK-11610 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/examples/ The markdown file is located in flink/docs/examples/index.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11608) Translate the "Local Setup Tutorial" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-11608: Description: The page url is https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html The markdown file is located in flink/docs/tutorials/local_setup.zh.md The markdown file will be created once FLINK-11529 is merged. was: The page url is https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html The markdown file is located in flink/docs/concepts/local_setup.zh.md The markdown file will be created once FLINK-11529 is merged. > Translate the "Local Setup Tutorial" page into Chinese > -- > > Key: FLINK-11608 > URL: https://issues.apache.org/jira/browse/FLINK-11608 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/tutorials/local_setup.html > The markdown file is located in flink/docs/tutorials/local_setup.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11609) Translate the "Running Flink on Windows" page into Chinese
Jark Wu created FLINK-11609: --- Summary: Translate the "Running Flink on Windows" page into Chinese Key: FLINK-11609 URL: https://issues.apache.org/jira/browse/FLINK-11609 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/tutorials/flink_on_windows.html The markdown file is located in flink/docs/tutorials/flink_on_windows.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11055) Allow Queryable State to be transformed on the TaskManager before being returned to the client
[ https://issues.apache.org/jira/browse/FLINK-11055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767922#comment-16767922 ] vinoyang commented on FLINK-11055: -- Hi [~galenwarren] I think your proposal is valuable. It is duplicated with FLINK-10118. > Allow Queryable State to be transformed on the TaskManager before being > returned to the client > -- > > Key: FLINK-11055 > URL: https://issues.apache.org/jira/browse/FLINK-11055 > Project: Flink > Issue Type: New Feature > Components: Queryable State >Reporter: Galen Warren >Priority: Major > Fix For: 1.7.3 > > > The proposal here is to enhance the way Queryable State works to allow for > the state object to be transformed on the TaskManager before being returned > to the client. As an example, if some MapState were made queryable, such > a transform might look up a specific key in the map and return its > corresponding value, resulting in only that value being returned to the > client instead of the entire map. This could be useful in cases where the > client only wants a portion of the state and the state is large (this is my > use case). > At a high level, I think this could be accomplished by adding an (optional) > serializable Function into KvStateRequest (and related > classes?) and having that transform be applied in the QueryableStateServer > (or QueryableStateClientProxy?). I expect some additional TypeInformation > would also have to be supplied/used in places. It should be doable in a > backwards compatible way such that if the client does not specify a transform > it works exactly as it does now. > Would there be any interested in a PR for this? This would help me for > something I'm currently working on and I'd be willing to take a crack at it. > If there is interest, I'll be happy to do some more research to come up with > a more concrete proposal. > Thanks for Flink - it's great! > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11606) Translate the "Distributed Runtime Environment" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huadong Sun reassigned FLINK-11606: --- Assignee: Huadong Sun > Translate the "Distributed Runtime Environment" page into Chinese > - > > Key: FLINK-11606 > URL: https://issues.apache.org/jira/browse/FLINK-11606 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: Huadong Sun >Priority: Major > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html > The markdown file is located in flink/docs/concepts/runtime.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11606) Translate the "Distributed Runtime Environment" page into Chinese
Jark Wu created FLINK-11606: --- Summary: Translate the "Distributed Runtime Environment" page into Chinese Key: FLINK-11606 URL: https://issues.apache.org/jira/browse/FLINK-11606 Project: Flink Issue Type: Sub-task Components: chinese-translation, Documentation Reporter: Jark Wu The page url is https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html The markdown file is located in flink/docs/concepts/runtime.zh.md The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10418) Add COTH math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksei Izmalkin closed FLINK-10418. Resolution: Won't Fix > Add COTH math function supported in Table API and SQL > - > > Key: FLINK-10418 > URL: https://issues.apache.org/jira/browse/FLINK-10418 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by FLINK-10398 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10472) Add CBRT math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767910#comment-16767910 ] Aleksei Izmalkin commented on FLINK-10472: -- The PR is closed. So I close the JIRA. > Add CBRT math function supported in Table API and SQL > - > > Key: FLINK-10472 > URL: https://issues.apache.org/jira/browse/FLINK-10472 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement the function to calculate the cube root. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10418) Add COTH math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767911#comment-16767911 ] Aleksei Izmalkin commented on FLINK-10418: -- The PR is closed. So I close the JIRA. > Add COTH math function supported in Table API and SQL > - > > Key: FLINK-10418 > URL: https://issues.apache.org/jira/browse/FLINK-10418 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by FLINK-10398 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10472) Add CBRT math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksei Izmalkin reopened FLINK-10472: -- > Add CBRT math function supported in Table API and SQL > - > > Key: FLINK-10472 > URL: https://issues.apache.org/jira/browse/FLINK-10472 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement the function to calculate the cube root. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10418) Add COTH math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksei Izmalkin reopened FLINK-10418: -- > Add COTH math function supported in Table API and SQL > - > > Key: FLINK-10418 > URL: https://issues.apache.org/jira/browse/FLINK-10418 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by FLINK-10398 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10472) Add CBRT math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksei Izmalkin closed FLINK-10472. Resolution: Won't Fix > Add CBRT math function supported in Table API and SQL > - > > Key: FLINK-10472 > URL: https://issues.apache.org/jira/browse/FLINK-10472 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement the function to calculate the cube root. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10418) Add COTH math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksei Izmalkin closed FLINK-10418. Resolution: Fixed > Add COTH math function supported in Table API and SQL > - > > Key: FLINK-10418 > URL: https://issues.apache.org/jira/browse/FLINK-10418 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Inspired by FLINK-10398 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10472) Add CBRT math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aleksei Izmalkin closed FLINK-10472. Resolution: Fixed > Add CBRT math function supported in Table API and SQL > - > > Key: FLINK-10472 > URL: https://issues.apache.org/jira/browse/FLINK-10472 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement the function to calculate the cube root. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] igalshilman edited a comment on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
igalshilman edited a comment on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#issuecomment-463507844 Thanks for the review @tzulitai . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] igalshilman commented on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
igalshilman commented on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#issuecomment-463507844 Thanks for the review @tzulita. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on issue #7593: [FLINK-11441][network] Remove the schedule mode property from RPDD to TDD
zhijiangW commented on issue #7593: [FLINK-11441][network] Remove the schedule mode property from RPDD to TDD URL: https://github.com/apache/flink/pull/7593#issuecomment-463504624 Considering future extending schedule module which might result in different `sendScheduleOrUpdateConsumers` for different edges in graph, we keep the current logic to close this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW closed pull request #7593: [FLINK-11441][network] Remove the schedule mode property from RPDD to TDD
zhijiangW closed pull request #7593: [FLINK-11441][network] Remove the schedule mode property from RPDD to TDD URL: https://github.com/apache/flink/pull/7593 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11604) Extend the necessary methods in ResultPartitionWriter interface
[ https://issues.apache.org/jira/browse/FLINK-11604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11604: --- Labels: pull-request-available (was: ) > Extend the necessary methods in ResultPartitionWriter interface > --- > > Key: FLINK-11604 > URL: https://issues.apache.org/jira/browse/FLINK-11604 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > > This is a preparation work for future creating {{ResultPartitionWriter}} via > proposed {{ShuffleService}}. > Currently there exists only one {{ResultPartition}} implementation for > {{ResultPartitionWriter}} interface, so the specific {{ResultPartition}} > instance is easily referenced in many other classes such as {{Task}}, > {{NetworkEnvironment}}, etc. Even some private methods in {{ResultPartition}} > would be called directly in these reference classes. > Considering {{ShuffleService}} might create multiple different > {{ResultPartitionWriter}} implementations future, then all the other classes > should only reference with the interface and call the common methods. > Therefore we extend the related methods in {{ResultPartitionWriter}} > interface in order to cover existing logics in {{ResultPartition}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11567) Translate "How to Review a Pull Request" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767892#comment-16767892 ] xulinjie commented on FLINK-11567: -- Hello, would you mind rerecognising the issue? > Translate "How to Review a Pull Request" page into Chinese > -- > > Key: FLINK-11567 > URL: https://issues.apache.org/jira/browse/FLINK-11567 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Jark Wu >Assignee: Congxian Qiu >Priority: Major > > Translate "How to Review a Pull Request" page into Chinese. > The markdown file is located in: flink-web/reviewing-prs.zh.md > The url link is: https://flink.apache.org/zh/reviewing-prs.html > Please adjust the links in the page to Chinese pages when translating. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7704: [FLINK-11604][network] Extend the necessary methods in ResultPartitionWriter interface
flinkbot commented on issue #7704: [FLINK-11604][network] Extend the necessary methods in ResultPartitionWriter interface URL: https://github.com/apache/flink/pull/7704#issuecomment-463504080 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW opened a new pull request #7704: [FLINK-11604][network] Extend the necessary methods in ResultPartitionWriter interface
zhijiangW opened a new pull request #7704: [FLINK-11604][network] Extend the necessary methods in ResultPartitionWriter interface URL: https://github.com/apache/flink/pull/7704 ## What is the purpose of the change *This is a preparation work for future creating `ResultPartitionWriter` via proposed `ShuffleService`.* *Currently there exists only one `ResultPartition` implementation for `ResultPartitionWriter` interface, so the specific `ResultPartition` instance is easily referenced in many other classes such as `Task`, `NetworkEnvironment`, etc. Even some private methods in `ResultPartition` would be called directly in these reference classes.* *Considering ShuffleService might create multiple different `ResultPartitionWriter` implementations future, then all the other classes should only reference with the interface and call the common methods. Therefore we extend the related methods in `ResultPartitionWriter` interface in order to cover existing logics in `ResultPartition`.* ## Brief change log - *Extend more methods in `ResultPartitionWriter` interface* - *Reference `ResultPartitionWriter` instead of `ResultPartition` in related classes* - *Introduce `TestResultPartitionWriter` for reusing and simplifying related tests* ## Verifying this change This change is already covered by existing tests, such as *RecordWriterTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7703: [FLINK-11589][Security] Service provider for security module and context discovery
flinkbot commented on issue #7703: [FLINK-11589][Security] Service provider for security module and context discovery URL: https://github.com/apache/flink/pull/7703#issuecomment-463486243 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11589) Introduce service provider pattern for user to dynamically load SecurityFactory classes
[ https://issues.apache.org/jira/browse/FLINK-11589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11589: --- Labels: pull-request-available (was: ) > Introduce service provider pattern for user to dynamically load > SecurityFactory classes > --- > > Key: FLINK-11589 > URL: https://issues.apache.org/jira/browse/FLINK-11589 > Project: Flink > Issue Type: Sub-task > Components: Security >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > Currently there are only 3 security modules in Flink - Hadoop, Zookeeper and > JaaS, all of which are pre-loaded to the Flink security runtime with one > hard-coded path for instantiating SecurityContext, which is used invoke use > code with PrivilegedExceptionAction. > We propose to introduce a [service provider > pattern|https://docs.oracle.com/javase/tutorial/ext/basics/spi.html] to allow > users to dynamically load {{SecurityModuleFactory}} or even introduce a new > {{SecurityContextFactory}} so that security runtime modules/context can be > set by dynamically loading any 3rd party JAR. The discover or these modules > are currently designed to go through property configurations. > This is especially useful in a corporate environment where proprietary > security technologies are involved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr opened a new pull request #7703: [FLINK-11589][Security] Service provider for security module and context discovery
walterddr opened a new pull request #7703: [FLINK-11589][Security] Service provider for security module and context discovery URL: https://github.com/apache/flink/pull/7703 ## What is the purpose of the change This PR refactors `SecurityUtils.class` in `flink-runtime` and replace with an extendable module based on the [service provider pattern](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). With this extension, users can {create extendable security modules](https://docs.oracle.com/javase/tutorial/ext/basics/spi.html) with 3rd-party implementations. ## Brief change log - Added SecurityFactoryService class for discovering installed SecurityFactory(s) in the classpath. - Extended into SecurityModuleFactory and SecurityContextFactory. - Replace SecurityUtils with SecurityEnvironment class that holds all installed security-related objects. - Replace current security installation process with a 2-step installation - first install security module(s) per user-defined properties, then install security context based on security modules and properties - Refactored current Hadoop/JaaS/Zookeeper modules into extendable class. ## Verifying this change This change is already covered by existing tests in `flink-runtime` and ITCases in YARN and Kafka modules, also added to the tests: - Modified YARN and Kafka test modules to install modules/context through service provider discovery - Included test module factories and a default security context factory in test path. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes (affects security installation) - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not yet, awaits review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
flinkbot edited a comment on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#issuecomment-463298582 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tzulitai [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @tzulitai [PMC] * ❔ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @tzulitai [PMC] * ✅ 5. Overall code [quality] is good. - Approved by @tzulitai [PMC] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
tzulitai commented on a change in pull request #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#discussion_r256686727 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/LegacySerializerSnapshotTransformer.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.Internal; + +/** + * Provide a way for {@link TypeSerializer}s to transform a legacy {@link TypeSerializerSnapshot}. + * + * This interface is provided for {@link TypeSerializer}s to implement, that would like to transform an + * associated snapshot class during deserialization from previous Flink versions. + */ +@Internal +public interface LegacySerializerSnapshotTransformer { + + /** +* Transform a {@link TypeSerializerSnapshot} that was previously associated with {@code this} {@link TypeSerializer}. +* +* @param legacySnapshot the snapshot to transform +* @param the snapshot element type Review comment: nit: to be more exact, this is the "legacy" snapshot's element type This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
tzulitai commented on a change in pull request #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#discussion_r256686830 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java ## @@ -190,26 +191,35 @@ public boolean canEqual(Object obj) { return new LockableTypeSerializerSnapshot<>(this); } - /** -* This cannot be removed until {@link TypeSerializerConfigSnapshot} is no longer supported. -*/ - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - // backwards compatibility path - CompatibilityResult inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult( - configSnapshot.restoreSerializer(), - UnloadableDummyTypeSerializer.class, - configSnapshot, - elementSerializer); - - return (inputCompatibilityResult.isRequiresMigration()) - ? CompatibilityResult.requiresMigration() - : CompatibilityResult.compatible(); - } - @VisibleForTesting TypeSerializer getElementSerializer() { return elementSerializer; } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializerSnapshot> transformLegacySerializerSnapshot(TypeSerializerSnapshot legacySnapshot) { + if (legacySnapshot instanceof LockableTypeSerializerSnapshot) { + return (TypeSerializerSnapshot>) legacySnapshot; + } + // In flink 1.6, this serializer was directly returning the elementSerializer's snapshot Review comment: nit: capital F for Flink This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
tzulitai commented on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#issuecomment-463484676 @flinkbot approve quality @flinkbot approve architecture This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
tzulitai commented on a change in pull request #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#discussion_r256686616 ## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtil.java ## @@ -48,4 +48,19 @@ checkArgument(legacyNestedSnapshots.length > 0); return newCompositeSnapshot.internalResolveSchemaCompatibility(newSerializer, legacyNestedSnapshots); } + Review comment: nit: unnecessary extra empty line This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r256687206 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ## @@ -482,6 +502,49 @@ void reassignPartitions(List> newPartit return new KafkaConsumer<>(kafkaProperties); } + @VisibleForTesting + RateLimiter getRateLimiter() { + return rateLimiter; + } + + // --- + // Rate limiting methods + // --- + /** +* +* @param records List of ConsumerRecords. +* @return Total batch size in bytes, including key and value. +*/ + private int getRecordBatchSize(ConsumerRecords records) { Review comment: The `AbstractFetcher` has two subclasses - `Kafka09Fetcher` and `Kafka08Fetcher`. The serialization is done inside them. `Kafka09Fetcher` is sort of an abstract fetcher for Kafka 0.9+. `Kafka08Fetcher` is more of a legacy. Maybe we can just do that in `Kafka09Fetcher`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files
flinkbot commented on issue #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files URL: https://github.com/apache/flink/pull/7702#issuecomment-463484443 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer URL: https://github.com/apache/flink/pull/7679#discussion_r256686663 ## File path: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ## @@ -22,12 +22,16 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.config.RateLimiterFactory; import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; Review comment: @zentol @tweise Just for my own curiosity. I agree that we should shade the jar to avoid interference with other user imports. What I am wondering is how should that be done. Whether it should be done in the pom.xml or from the import explicitly. I am not sure about the difference between those two ways. But I did not see such explicit imports in Flink anywhere else. Am I missing something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr opened a new pull request #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files
walterddr opened a new pull request #7702: [FLINK-11088][Security][YARN] Allow YARN to discover pre-installed keytab files URL: https://github.com/apache/flink/pull/7702 ## What is the purpose of the change * This PR introduces two new configuration keys in the YARN configuration to allow Flink to load pre-installed Kerberos keytab files directly from local file system instead of having to have Flink client upload Kerberos keytab files through Yarn local resource bucket. ## Brief change log - Added two new key with default values - `yarn.security.kerberos.keytab.path` which defaults to local resource bucket: `krb.keytab` - `yarn.security.kerberos.require.localize.keytab` which defaults to true. If set to false, Flink will not upload the client keytab used in its own section to YARN local resource bucket. Instead, whatever path configured in `yarn.security.kerberos.keytab.path` will be used. - Changed AbstractYarnClusterDescriptor to conform with 2 options above. - Changed the YarnTaskExecutorRunner to load keytab configurations differently according to 2 options above. ## Verifying this change This change is already covered by existing tests in `flink-yarn-test` component. This change also added tests - added additional test config parsing in YarnTaskExecutorRunnerTest. - Modified YARNSessionFIFOITCase and YARNSessionFIFOSecuredITCase to allow dynamic properties loading during test sections. - Added specific section for pre-installed YARN Kerberos keytab file. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? document regenerated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11088) Allow pre-install Kerberos authentication keytab discovery on YARN
[ https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11088: --- Labels: pull-request-available (was: ) > Allow pre-install Kerberos authentication keytab discovery on YARN > -- > > Key: FLINK-11088 > URL: https://issues.apache.org/jira/browse/FLINK-11088 > Project: Flink > Issue Type: Sub-task > Components: Security, YARN >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > Currently flink-yarn assumes keytab is shipped as application master > environment local resource on client side and will be distributed to all the > TMs. This does not work for YARN proxy user mode [1] since proxy user or > super user might not have access to actual users' keytab, but can request > delegation tokens on users' behalf. > Based on the type of security options for long-living YARN service[2], we > propose to have the keytab file path discovery configurable depending on the > launch mode of the YARN client. > Reference: > [1] > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html > [2] > https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11589) Introduce service provider pattern for user to dynamically load SecurityFactory classes
[ https://issues.apache.org/jira/browse/FLINK-11589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-11589: -- Description: Currently there are only 3 security modules in Flink - Hadoop, Zookeeper and JaaS, all of which are pre-loaded to the Flink security runtime with one hard-coded path for instantiating SecurityContext, which is used invoke use code with PrivilegedExceptionAction. We propose to introduce a [service provider pattern|https://docs.oracle.com/javase/tutorial/ext/basics/spi.html] to allow users to dynamically load {{SecurityModuleFactory}} or even introduce a new {{SecurityContextFactory}} so that security runtime modules/context can be set by dynamically loading any 3rd party JAR. The discover or these modules are currently designed to go through property configurations. This is especially useful in a corporate environment where proprietary security technologies are involved. was: Currently there are only 3 security modules in Flink - Hadoop, Zookeeper and JaaS, all of which are pre-loaded to the Flink security runtime with one hard-coded path for instantiating SecurityContext, which is used invoke use code with PrivilegedExceptionAction. We propose to introduce a [service provider pattern|https://docs.oracle.com/javase/tutorial/ext/basics/spi.html] to allow users to dynamically load {{SecurityModuleFactory}} or even introduce a new {{SecurityContextFactory}} so that all the security runtime context can be set by dynamically loading any 3rd party JAR. and discover them through property configurations. This is especially useful in a corporate environment where proprietary security technologies are involved. > Introduce service provider pattern for user to dynamically load > SecurityFactory classes > --- > > Key: FLINK-11589 > URL: https://issues.apache.org/jira/browse/FLINK-11589 > Project: Flink > Issue Type: Sub-task > Components: Security >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently there are only 3 security modules in Flink - Hadoop, Zookeeper and > JaaS, all of which are pre-loaded to the Flink security runtime with one > hard-coded path for instantiating SecurityContext, which is used invoke use > code with PrivilegedExceptionAction. > We propose to introduce a [service provider > pattern|https://docs.oracle.com/javase/tutorial/ext/basics/spi.html] to allow > users to dynamically load {{SecurityModuleFactory}} or even introduce a new > {{SecurityContextFactory}} so that security runtime modules/context can be > set by dynamically loading any 3rd party JAR. The discover or these modules > are currently designed to go through property configurations. > This is especially useful in a corporate environment where proprietary > security technologies are involved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot edited a comment on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
flinkbot edited a comment on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#issuecomment-463298582 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tzulitai [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @tzulitai [PMC] * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations
tzulitai commented on issue #7695: [FLINK-11591][core] Support legacy TypeSerializerSnapshot transformations URL: https://github.com/apache/flink/pull/7695#issuecomment-463479601 @flinkbot approve description @flinkbot approve consensus This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r256683639 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation; +import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestorePrepareResult; +import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase; +import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy; +import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; +import org.apache.flink.util.StateMigrationException; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; + +import static org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME; + +/** + * Builder class for {@link RocksDBKeyedStateBackend} which handles all necessary initializations and clean ups. + * + * @param The data type that the key serializer serializes. + */ +public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBackendBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class); + public static final String DB_INSTANCE_DIR_STRING = "db"; + + /** String that identifies the operator that owns this backend. */ + private final String operatorIdentifier; + private final RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType; + /** The configuration of local recovery. */ + private final LocalRecoveryConfig localRecoveryConfig; + + //-- + + /** The column family options from the options factory. */ + private final ColumnFamilyOptions columnFamilyOptions; + + /** The DB options from the options factory. */ + private final DBOptions dbOptions; + + /** Path where this configured instance stores
[jira] [Created] (FLINK-11604) Extend the necessary methods in ResultPartitionWriter interface
zhijiang created FLINK-11604: Summary: Extend the necessary methods in ResultPartitionWriter interface Key: FLINK-11604 URL: https://issues.apache.org/jira/browse/FLINK-11604 Project: Flink Issue Type: Sub-task Components: Network Reporter: zhijiang Assignee: zhijiang Fix For: 1.8.0 This is a preparation work for future creating {{ResultPartitionWriter}} via proposed {{ShuffleService}}. Currently there exists only one {{ResultPartition}} implementation for {{ResultPartitionWriter}} interface, so the specific {{ResultPartition}} instance is easily referenced in many other classes such as {{Task}}, {{NetworkEnvironment}}, etc. Even some private methods in {{ResultPartition}} would be called directly in these reference classes. Considering {{ShuffleService}} might create multiple different {{ResultPartitionWriter}} implementations future, then all the other classes should only reference with the interface and call the common methods. Therefore we extend the related methods in {{ResultPartitionWriter}} interface in order to cover existing logics in {{ResultPartition}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r256681305 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation; +import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestorePrepareResult; +import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase; +import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy; +import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; +import org.apache.flink.util.StateMigrationException; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; + +import static org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME; + +/** + * Builder class for {@link RocksDBKeyedStateBackend} which handles all necessary initializations and clean ups. + * + * @param The data type that the key serializer serializes. + */ +public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBackendBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class); + public static final String DB_INSTANCE_DIR_STRING = "db"; + + /** String that identifies the operator that owns this backend. */ + private final String operatorIdentifier; + private final RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType; + /** The configuration of local recovery. */ + private final LocalRecoveryConfig localRecoveryConfig; + + //-- + + /** The column family options from the options factory. */ + private final ColumnFamilyOptions columnFamilyOptions; + + /** The DB options from the options factory. */ + private final DBOptions dbOptions; + + /** Path where this configured instance stores
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r256681092 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SupplierWithInputAndException.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Public; + +import javax.annotation.Nonnull; + +/** + * A functional interface for a {@link java.util.function.Supplier} that may + * throw exceptions. + * + * @param The type of the input for the supplier + * @param The type of the result of the supplier. + * @param The type of Exceptions thrown by this function. + */ +@Public +@FunctionalInterface +public interface SupplierWithInputAndException { Review comment: Agreed, will use `FunctionWithException` instead This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r256681007 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java ## @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation; +import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestorePrepareResult; +import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase; +import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy; +import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; +import org.apache.flink.util.StateMigrationException; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; + +import static org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME; + +/** + * Builder class for {@link RocksDBKeyedStateBackend} which handles all necessary initializations and clean ups. + * + * @param The data type that the key serializer serializes. + */ +public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBackendBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class); + public static final String DB_INSTANCE_DIR_STRING = "db"; + + /** String that identifies the operator that owns this backend. */ + private final String operatorIdentifier; + private final RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType; + /** The configuration of local recovery. */ + private final LocalRecoveryConfig localRecoveryConfig; + + //-- + + /** The column family options from the options factory. */ + private final ColumnFamilyOptions columnFamilyOptions; + + /** The DB options from the options factory. */ + private final DBOptions dbOptions; + + /** Path where this configured instance stores
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r256675562 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java ## @@ -141,7 +140,7 @@ public T createAndRestore(@Nonnull List> restoreOptions) private T attemptCreateAndRestore(Collection restoreState) throws Exception { // create a new, empty backend. - final T backendInstance = instanceSupplier.get(); + final T backendInstance = instanceSupplier.get(restoreState); Review comment: I also agree to do restore in the builder, while not that sure whether we need to reserve the restore method for dry-run solution. I'm also concerned that with current change the whole incremental restore logic is scattered in builder and `RocksDBIncrementalRestoreOperation`, and thinking about reinforce this part. What's your opinion? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code
carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r256675319 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ## @@ -458,7 +460,8 @@ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, - MetricGroup metricGroup) { + MetricGroup metricGroup, + Collection stateHandles) { Review comment: I agree that all backend should follow the builder way in the end, and that's why AbstractKeyedStateBackendBuilder is introduced in the commit here. Maybe we could focus on rocksdb here then the others in a separate JIRA/PR? Wdyt? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version URL: https://github.com/apache/flink/pull/7599#issuecomment-463467815 @StefanRRichter Please help to review this PR, thanks :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] FaxianZhao commented on issue #7675: [FLINK-8297] [flink-rocksdb] A plan store elements of ListState as multiple key-values in rocksdb
FaxianZhao commented on issue #7675: [FLINK-8297] [flink-rocksdb] A plan store elements of ListState as multiple key-values in rocksdb URL: https://github.com/apache/flink/pull/7675#issuecomment-463467316 @klion26 thanks for your help. It looks good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-9700) Document FlinkKafkaProducer behaviour for Kafka versions > 0.11
[ https://issues.apache.org/jira/browse/FLINK-9700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] leesf closed FLINK-9700. Resolution: Fixed > Document FlinkKafkaProducer behaviour for Kafka versions > 0.11 > --- > > Key: FLINK-9700 > URL: https://issues.apache.org/jira/browse/FLINK-9700 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.5.0 >Reporter: Ufuk Celebi >Assignee: leesf >Priority: Minor > > FlinkKafkaProducer for Kafka 0.11 uses reflection to work around API > limitations of the Kafka client. Using reflection breaks with newer versions > of the Kafka client (due to internal changes of the client). > The documentation does not mention newer Kafka versions. We should add the > following notes: > - Only package Kafka connector with kafka.version property set to 0.11.*.* > - Mention that it is possible to use the 0.11 connector with newer versions > of Kafka as the protocol seems to be backwards compatible (double check that > this is correct) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11430) Incorrect Akka timeout syntax
[ https://issues.apache.org/jira/browse/FLINK-11430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767800#comment-16767800 ] leesf commented on FLINK-11430: --- [~sinadoom], hi sina, it seems all right with spaces in Akka timeouts config. > Incorrect Akka timeout syntax > - > > Key: FLINK-11430 > URL: https://issues.apache.org/jira/browse/FLINK-11430 > Project: Flink > Issue Type: Task > Components: Documentation >Affects Versions: 1.7.0, 1.7.1 >Reporter: Sina Madani >Assignee: leesf >Priority: Trivial > > The current > [documentation|https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html] > specifies the syntax for Akka timeouts to be in the form "[0-9]+ > ms|s|min|h|d". However this doesn't work, leading to NumberFormatException or > similar. Reading through the [Akka > documentation|https://doc.akka.io/docs/akka/2.5/general/configuration.html] > however, it seems the correct format is [0-9]+ms|s|min|h|d (note the lack of > spaces and quotes). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory removed messages
liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory removed messages URL: https://github.com/apache/flink/pull/7257#issuecomment-463450318 @rmetzger Thanks for your reply.I have update the patch,could help me to review it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11334) Migrate enum serializers to use new serialization compatibility abstractions
[ https://issues.apache.org/jira/browse/FLINK-11334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767771#comment-16767771 ] Congxian Qiu commented on FLINK-11334: -- Hi, [~kisimple], What's the status of this issue, I have an almost done patch, If you don't mind, could I take over this issue? > Migrate enum serializers to use new serialization compatibility abstractions > > > Key: FLINK-11334 > URL: https://issues.apache.org/jira/browse/FLINK-11334 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: boshu Zheng >Priority: Major > > This subtask covers migration of: > * EnumSerializerConfigSnapshot > * ScalaEnumSerializerConfigSnapshot > to use the new serialization compatibility APIs ({{TypeSerializerSnapshot}} > and {{TypeSerializerSchemaCompatibility). > The enum serializer snapshots should be implemented so that on restore the > order of Enum constants can be reordered (a case for serializer > reconfiguration), as well as adding new Enum constants. > Serializers are only considered to have completed migration according to the > defined list of things to check in FLINK-11327. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11421) Add compilation options to allow compiling generated code with JDK compiler
[ https://issues.apache.org/jira/browse/FLINK-11421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chunhui Shi updated FLINK-11421: Summary: Add compilation options to allow compiling generated code with JDK compiler (was: Providing more compilation options for code-generated operators) > Add compilation options to allow compiling generated code with JDK compiler > > > Key: FLINK-11421 > URL: https://issues.apache.org/jira/browse/FLINK-11421 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Liya Fan >Assignee: Liya Fan >Priority: Major > Labels: pull-request-available > Original Estimate: 240h > Time Spent: 10m > Remaining Estimate: 239h 50m > > Flink supports some operators (like Calc, Hash Agg, Hash Join, etc.) by code > generation. That is, Flink generates their source code dynamically, and then > compile it into Java Byte Code, which is load and executed at runtime. > > By default, Flink compiles the generated source code by Janino. This is fast, > as the compilation often finishes in hundreds of milliseconds. The generated > Java Byte Code, however, is of poor quality. To illustrate, we use Java > Compiler API (JCA) to compile the generated code. Experiments on TPC-H (1 TB) > queries show that the E2E time can be more than 10% shorter, when operators > are compiled by JCA, despite that it takes more time (a few seconds) to > compile with JCA. > > Therefore, we believe it is beneficial to compile generated code by JCA in > the following scenarios: 1) For batch jobs, the E2E time is relatively long, > so it is worth of spending more time compiling and generating high quality > Java Byte Code. 2) For repeated stream jobs, the generated code will be > compiled once and run many times. Therefore, it pays to spend more time > compiling for the first time, and enjoy the high byte code qualities for > later runs. > > According to the above observations, we want to provide a compilation option > (Janino, JCA, or dynamic) for Flink, so that the user can choose the one > suitable for their specific scenario and obtain better performance whenever > possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11601) Remove legacy JobManagerGateway
[ https://issues.apache.org/jira/browse/FLINK-11601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun updated FLINK-11601: - Summary: Remove legacy JobManagerGateway (was: Remove legacy AkkaJobManagerGateway) > Remove legacy JobManagerGateway > --- > > Key: FLINK-11601 > URL: https://issues.apache.org/jira/browse/FLINK-11601 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11603) Ported the MetricQueryService to the new RpcEndpoint
TisonKun created FLINK-11603: Summary: Ported the MetricQueryService to the new RpcEndpoint Key: FLINK-11603 URL: https://issues.apache.org/jira/browse/FLINK-11603 Project: Flink Issue Type: Improvement Components: Metrics Reporter: TisonKun Assignee: TisonKun Given that a series TODO mention {{This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint}}, I'd like to give it a try to implement the RpcEndpoint version of MetricQueryService. Basically, port {{onRecieve}} to 1. {{addMetric(metricName, metric, group)}} 2. {{removeMetric(metric)}} 3. {{createDump()}} And then adjust tests and replace {{metricServiceQueryPath}} with a corresponding {{RpcGateway}}. I'd like to learn that if the statement if true --- when we call a Runnable/Callable with runAsync/callAsync, then the Runnable/Callable is running in the main thread of the underlying RPC service, specifically, in the actor thread? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11603) Port the MetricQueryService to the new RpcEndpoint
[ https://issues.apache.org/jira/browse/FLINK-11603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun updated FLINK-11603: - Summary: Port the MetricQueryService to the new RpcEndpoint (was: Ported the MetricQueryService to the new RpcEndpoint) > Port the MetricQueryService to the new RpcEndpoint > -- > > Key: FLINK-11603 > URL: https://issues.apache.org/jira/browse/FLINK-11603 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > > Given that a series TODO mention {{This is a temporary hack until we have > ported the MetricQueryService to the new RpcEndpoint}}, I'd like to give it a > try to implement the RpcEndpoint version of MetricQueryService. > Basically, port {{onRecieve}} to > 1. {{addMetric(metricName, metric, group)}} > 2. {{removeMetric(metric)}} > 3. {{createDump()}} > And then adjust tests and replace {{metricServiceQueryPath}} with a > corresponding {{RpcGateway}}. > I'd like to learn that if the statement if true --- when we call a > Runnable/Callable with runAsync/callAsync, then the Runnable/Callable is > running in the main thread of the underlying RPC service, specifically, in > the actor thread? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11602) Remove legacy AkkaJobManagerRetriever
[ https://issues.apache.org/jira/browse/FLINK-11602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11602: --- Labels: pull-request-available (was: ) > Remove legacy AkkaJobManagerRetriever > - > > Key: FLINK-11602 > URL: https://issues.apache.org/jira/browse/FLINK-11602 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7701: [FLINK-11602] Remove legacy AkkaJobManagerRetriever
flinkbot commented on issue #7701: [FLINK-11602] Remove legacy AkkaJobManagerRetriever URL: https://github.com/apache/flink/pull/7701#issuecomment-463426863 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10495) Add HYPOT math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-10495. --- Resolution: Won't Fix The master code is changed, and the PR is closed. So I close the JIRA. > Add HYPOT math function supported in Table API and SQL > -- > > Key: FLINK-10495 > URL: https://issues.apache.org/jira/browse/FLINK-10495 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Aleksei Izmalkin >Assignee: Aleksei Izmalkin >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Implement the function to calculate the sqrt(_x_^2^ +_y_^2^). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #7701: [FLINK-11602] Remove legacy AkkaJobManagerRetriever
TisonKun opened a new pull request #7701: [FLINK-11602] Remove legacy AkkaJobManagerRetriever URL: https://github.com/apache/flink/pull/7701 ## What is the purpose of the change Remove legacy `AkkaJobManagerRetriever` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector:(no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @tillrohrmann @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11602) Remove legacy AkkaJobManagerRetriever
TisonKun created FLINK-11602: Summary: Remove legacy AkkaJobManagerRetriever Key: FLINK-11602 URL: https://issues.apache.org/jira/browse/FLINK-11602 Project: Flink Issue Type: Sub-task Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on issue #6797: [FLINK-10495] [table] Add HYPOT math function supported in Table API and SQL
sunjincheng121 commented on issue #6797: [FLINK-10495] [table] Add HYPOT math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6797#issuecomment-463426141 Thanks for check the code and close the PR. @aai95 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11601) Remove legacy AkkaJobManagerGateway
TisonKun created FLINK-11601: Summary: Remove legacy AkkaJobManagerGateway Key: FLINK-11601 URL: https://issues.apache.org/jira/browse/FLINK-11601 Project: Flink Issue Type: Sub-task Components: JobManager Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11600) Remove legacy JobListeningContext
TisonKun created FLINK-11600: Summary: Remove legacy JobListeningContext Key: FLINK-11600 URL: https://issues.apache.org/jira/browse/FLINK-11600 Project: Flink Issue Type: Sub-task Components: Client Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11599) Remove legacy JobClientActor
TisonKun created FLINK-11599: Summary: Remove legacy JobClientActor Key: FLINK-11599 URL: https://issues.apache.org/jira/browse/FLINK-11599 Project: Flink Issue Type: Sub-task Components: Client Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11598) Remove legacy JobSubmissionClientActor
TisonKun created FLINK-11598: Summary: Remove legacy JobSubmissionClientActor Key: FLINK-11598 URL: https://issues.apache.org/jira/browse/FLINK-11598 Project: Flink Issue Type: Sub-task Components: Client Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #7700: [FILNK-11597][test] Remove legacy JobManagerActorTestUtils
TisonKun opened a new pull request #7700: [FILNK-11597][test] Remove legacy JobManagerActorTestUtils URL: https://github.com/apache/flink/pull/7700 ## What is the purpose of the change Remove legacy `JobManagerActorTestUtils` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector:(no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @tillrohrmann @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7700: [FILNK-11597][test] Remove legacy JobManagerActorTestUtils
flinkbot commented on issue #7700: [FILNK-11597][test] Remove legacy JobManagerActorTestUtils URL: https://github.com/apache/flink/pull/7700#issuecomment-463419212 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11597) Remove legacy JobManagerActorTestUtils
TisonKun created FLINK-11597: Summary: Remove legacy JobManagerActorTestUtils Key: FLINK-11597 URL: https://issues.apache.org/jira/browse/FLINK-11597 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.8.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.8.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7699: [hotfix] Remove legacy ResourceManagerRunner
flinkbot commented on issue #7699: [hotfix] Remove legacy ResourceManagerRunner URL: https://github.com/apache/flink/pull/7699#issuecomment-463418201 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun opened a new pull request #7699: [hotfix] Remove legacy ResourceManagerRunner
TisonKun opened a new pull request #7699: [hotfix] Remove legacy ResourceManagerRunner URL: https://github.com/apache/flink/pull/7699 ## What is the purpose of the change Now a ResourceManager use FatalErrorHandler implementation in ClusterEntrypoint. No use point of `ResourceManagerRunner` and it is hard to believe that we will use it in the future. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector:(no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11596) Check & port ResourceManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-11596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11596: --- Labels: pull-request-available (was: ) > Check & port ResourceManagerTest to new code base > - > > Key: FLINK-11596 > URL: https://issues.apache.org/jira/browse/FLINK-11596 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Check & port {{ResourceManagerTest}} to new code base -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7698: [FLINK-11596][test] Remove legacy ResourceManagerTest
flinkbot commented on issue #7698: [FLINK-11596][test] Remove legacy ResourceManagerTest URL: https://github.com/apache/flink/pull/7698#issuecomment-463410663 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun opened a new pull request #7698: [FLINK-11596][test] Remove legacy ResourceManagerTest
TisonKun opened a new pull request #7698: [FLINK-11596][test] Remove legacy ResourceManagerTest URL: https://github.com/apache/flink/pull/7698 ## What is the purpose of the change Remove legacy `ResourceManagerTest` `testJobManagerRegistrationAndReconciliation` → Invalid, now a JobMaster connect to and register at a ResourceManager, which have their own tests in `JobMasterTest` and `ResourceManagerJobMasterTest`. `testDelayedJobManagerRegistration` → The same as above. `testTriggerReconnect` → Similar. Now a JM should reconnect to RM, which is tested at `JobMasterTest#testReconnectionAfterDisconnect` `testTaskManagerRegistration` → `ResourceManagerTaskExecutorTest#testRegisterTaskExecutor*` `testResourceRemoval` → Not such a message. Test `MesosResourceManagerTest#testStopWorker` and `YarnResourceManagerTest#testStopWorker`. `testResourceFailureNotification` → Invalid in new code base. `testHeartbeatTimeoutWithTaskExecutor` → `TaskExecutorTest#testHeartbeatTimeoutWithResourceManager` `testHeartbeatTimeoutWithJobManager` → `JobMasterTest#testHeartbeatTimeoutWithResourceManager` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector:(no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @tillrohrmann @GJL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11596) Check & port ResourceManagerTest to new code base
[ https://issues.apache.org/jira/browse/FLINK-11596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun reassigned FLINK-11596: Assignee: TisonKun > Check & port ResourceManagerTest to new code base > - > > Key: FLINK-11596 > URL: https://issues.apache.org/jira/browse/FLINK-11596 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.8.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.8.0 > > > Check & port {{ResourceManagerTest}} to new code base -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11596) Check & port ResourceManagerTest to new code base
TisonKun created FLINK-11596: Summary: Check & port ResourceManagerTest to new code base Key: FLINK-11596 URL: https://issues.apache.org/jira/browse/FLINK-11596 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.8.0 Reporter: TisonKun Fix For: 1.8.0 Check & port {{ResourceManagerTest}} to new code base -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11595) Gelly addEdge in certain circumstances still include duplicate vertices.
Calvin Han created FLINK-11595: -- Summary: Gelly addEdge in certain circumstances still include duplicate vertices. Key: FLINK-11595 URL: https://issues.apache.org/jira/browse/FLINK-11595 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 1.7.1 Environment: MacOS, intelliJ Reporter: Calvin Han Assuming a base graph constructed by: ``` public class GraphCorn { public static Graph gc; public GraphCorn(String filename) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> csvInput = env.readCsvFile(filename) .types(String.class, String.class, String.class, String.class, String.class, String.class); DataSet> srcTuples = csvInput.project(0, 2) .map(new MapFunction>() { @Override public Vertex map(Tuple tuple) throws Exception { VertexLabel lb = new VertexLabel(Util.hash(tuple.getField(1))); return new Vertex<>(tuple.getField(0), lb); } }).returns(new TypeHint>(){}); DataSet> dstTuples = csvInput.project(1, 3) .map(new MapFunction>() { @Override public Vertex map(Tuple tuple) throws Exception { VertexLabel lb = new VertexLabel(Util.hash(tuple.getField(1))); return new Vertex<>(tuple.getField(0), lb); } }).returns(new TypeHint>(){}); DataSet> vertexTuples = srcTuples.union(dstTuples).distinct(0); DataSet> edgeTuples = csvInput.project(0, 1, 4, 5) .map(new MapFunction>() { @Override public Edge map(Tuple tuple) throws Exception { EdgeLabel lb = new EdgeLabel(Util.hash(tuple.getField(2)), Long.parseLong(tuple.getField(3))); return new Edge<>(tuple.getField(0), tuple.getField(1), lb); } }).returns(new TypeHint>(){}); this.gc = Graph.fromDataSet(vertexTuples, edgeTuples, env); } } ``` Base graph CSV: ``` 0,1,a,b,c,0 0,2,a,d,e,1 1,2,b,d,f,2 ``` Attempt to add edges using the following function: ``` try(BufferedReader br = new BufferedReader(new FileReader(this.fileName))) { for(String line; (line = br.readLine()) != null; ) { String[] attributes = line.split(","); assert(attributes.length == 6); String srcID = attributes[0]; String dstID = attributes[1]; String srcLb = attributes[2]; String dstLb = attributes[3]; String edgeLb = attributes[4]; String ts = attributes[5]; Vertex src = new Vertex<>(srcID, new VertexLabel(Util.hash(srcLb))); Vertex dst = new Vertex<>(dstID, new VertexLabel(Util.hash(dstLb))); EdgeLabel edge = new EdgeLabel(Util.hash(edgeLb), Long.parseLong(ts)); GraphCorn.gc = GraphCorn.gc.addEdge(src, dst, edge); } } catch (Exception e) { System.err.println(e.getMessage()); } ``` The graph components to add is: ``` 0,4,a,d,k,3 1,3,b,a,g,3 2,3,d,a,h,4 ``` GraphCorn.gc will contain duplicate node 0, 1, and 2 (those that exist in base graph), which should not be the case acceding to the documentation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11595) Gelly's addEdge() in certain circumstances still includes duplicate vertices.
[ https://issues.apache.org/jira/browse/FLINK-11595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Han updated FLINK-11595: --- Summary: Gelly's addEdge() in certain circumstances still includes duplicate vertices. (was: Gelly addEdge in certain circumstances still include duplicate vertices.) > Gelly's addEdge() in certain circumstances still includes duplicate vertices. > - > > Key: FLINK-11595 > URL: https://issues.apache.org/jira/browse/FLINK-11595 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.7.1 > Environment: MacOS, intelliJ >Reporter: Calvin Han >Priority: Minor > Original Estimate: 2h > Remaining Estimate: 2h > > Assuming a base graph constructed by: > ``` > public class GraphCorn { > public static Graph gc; > public GraphCorn(String filename) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > DataSet> csvInput = > env.readCsvFile(filename) > .types(String.class, String.class, String.class, String.class, String.class, > String.class); > DataSet> srcTuples = csvInput.project(0, 2) > .map(new MapFunction>() { > @Override > public Vertex map(Tuple tuple) throws Exception { > VertexLabel lb = new VertexLabel(Util.hash(tuple.getField(1))); > return new Vertex<>(tuple.getField(0), lb); > } > }).returns(new TypeHint>(){}); > DataSet> dstTuples = csvInput.project(1, 3) > .map(new MapFunction>() { > @Override > public Vertex map(Tuple tuple) throws Exception { > VertexLabel lb = new VertexLabel(Util.hash(tuple.getField(1))); > return new Vertex<>(tuple.getField(0), lb); > } > }).returns(new TypeHint>(){}); > DataSet> vertexTuples = > srcTuples.union(dstTuples).distinct(0); > DataSet> edgeTuples = csvInput.project(0, 1, 4, 5) > .map(new MapFunction>() { > @Override > public Edge map(Tuple tuple) throws Exception { > EdgeLabel lb = new EdgeLabel(Util.hash(tuple.getField(2)), > Long.parseLong(tuple.getField(3))); > return new Edge<>(tuple.getField(0), tuple.getField(1), lb); > } > }).returns(new TypeHint>(){}); > this.gc = Graph.fromDataSet(vertexTuples, edgeTuples, env); > } > } > ``` > Base graph CSV: > ``` > 0,1,a,b,c,0 > 0,2,a,d,e,1 > 1,2,b,d,f,2 > ``` > Attempt to add edges using the following function: > ``` > try(BufferedReader br = new BufferedReader(new FileReader(this.fileName))) { > for(String line; (line = br.readLine()) != null; ) { > String[] attributes = line.split(","); > assert(attributes.length == 6); > String srcID = attributes[0]; > String dstID = attributes[1]; > String srcLb = attributes[2]; > String dstLb = attributes[3]; > String edgeLb = attributes[4]; > String ts = attributes[5]; > Vertex src = new Vertex<>(srcID, new > VertexLabel(Util.hash(srcLb))); > Vertex dst = new Vertex<>(dstID, new > VertexLabel(Util.hash(dstLb))); > EdgeLabel edge = new EdgeLabel(Util.hash(edgeLb), Long.parseLong(ts)); > GraphCorn.gc = GraphCorn.gc.addEdge(src, dst, edge); > } > } catch (Exception e) { > System.err.println(e.getMessage()); > } > ``` > The graph components to add is: > ``` > 0,4,a,d,k,3 > 1,3,b,a,g,3 > 2,3,d,a,h,4 > ``` > GraphCorn.gc will contain duplicate node 0, 1, and 2 (those that exist in > base graph), which should not be the case acceding to the documentation. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6966) Add maxParallelism and UIDs to all operators generated by the Table API / SQL
[ https://issues.apache.org/jira/browse/FLINK-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16767618#comment-16767618 ] Hao Dang commented on FLINK-6966: - Hi [~hequn8128], Understand this work didn't start as of last September, so want to check in and see if there is any news or plan to implement this feature? Thanks! > Add maxParallelism and UIDs to all operators generated by the Table API / SQL > - > > Key: FLINK-6966 > URL: https://issues.apache.org/jira/browse/FLINK-6966 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Priority: Major > > At the moment, the Table API does not assign UIDs and the max parallelism to > operators (except for operators with parallelism 1). > We should do that to avoid problems when rescaling or restarting jobs from > savepoints. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r256595763 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ## @@ -52,30 +52,38 @@ /** The ID of the partition the input channel is going to consume. */ private final ResultPartitionID consumedPartitionId; - /** The location of the partition the input channel is going to consume. */ - private final ResultPartitionLocation consumedPartitionLocation; + /** The location type of the partition the input channel is going to consume. */ + private final LocationType locationType; + + /** The connection to use to request the remote partition. */ + private final Optional connectionId; Review comment: ok, I think I see the problem now, thanks for explanation. I will put my thoughts in other order :) 2. During the design, I thought `ShuffleDeploymentDescriptor` was supposed to contain shuffle specific info generated by `ShuffleMaster` as a central point and used eventually by `ShuffleService` in producer and consumer Task to setup readers/writers. The example could be some partition identification or connection inside external shuffle system. The existing connection id/location is also an example of it for the existing netty stack, but might be not relevant for other shuffle systems. For example, let's say the partition is stored remotely (not in producer), the batch job is restored and some the partition is finished, we do not even need to deploy the producer, just connect the consumer to the existing 'done' external partition, then the existing connection id does not make sense, the consumer needs some kind of internal shuffle id of the partition. That is why I thought: PSD(ProducerResourceId,ProducerConnection,...) -> `ShuffleMaster` -> SDD(Internal) -> ICDD(SDD) -> Task -> ICDD,ConsumerResourceId -> `ShuffleService` -> InputGate -> read records. I think even `ShuffleService` itself can decide what to do with ProducerResourceId/ConsumerResourceId and calculate internally LocationType in case of existing netty. For other shuffle services, LocationType might be not relevant (like external partition), then maybe ICDD=SDD=PartitionInfo and we could leave only one of them, not sure. I thought of `UnknownShuffleDeploymentDescriptor` as a replacement of `LocationType.Unknown\ConnectionId=null` based on the above arguments. It is just a singleton stub to signal that SDD will be updated later with the `sendUpdatePartitionInfoRpcCall` in case of lazy scheduling. True, it is not generated by `ShuffleMaster`, what could be an alternative for this approach? 1. In case of eager deployment (lazyScheduling=false), currently, we can already deploy the consumer when the slot is assigned to the producer but its deployment has not started yet and we planned to generate the SDD during producer deployment. If we agree on 2., it seems that we need SDD for consumer to consume and it has to be known. Thinking more about `ShuffleMaster` interface, depending on its nature, it might be an asynchronous API like registering and talking to an external system. This means that ideally its partition register method should return a `CompletableFuture`. Then the producer execution life cycle should be: created -> scheduled -> slot assigned -> register partition (get and cache SDD) -> deploying (generate TDD with previously acquired SDD). Everything happening on the main thread of Job Master. The consumer has to be deployed not after producer slot is assigned but after partition is registered in eager scheduling. In lazy scheduling, we have the `sendUpdatePartitionInfoRpcCall` to send SDD later. I would suggest we do the partitions registering and SDD caching in `allocateAndAssignSlotForExecution`, right after slot assignment (needs rebase on the latest master): ``` return FutureUtils.handleAsyncIfNotDone(..tryAssignResource..) .thenComposeAsync( slot -> {..ShuffleMaster.register(PSD), cache SDDs..}, mainThreadExecutor); ``` Just maybe with refactoring the steps into different functions :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r256598926 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ShuffleDeploymentDescriptor.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.io.network.ConnectionID; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deployment descriptor for shuffle specific information. + */ +public class ShuffleDeploymentDescriptor implements Serializable { Review comment: True, for existing netty shuffle, it has to have more methods. Internally, I would suggest, the future NettyShuffleService will cast SDD to KnownNettySDD if it is not an UnknownSDD: ``` interface SDD { } enum UnknownSDD implements SDD { INSTANCE; } // special singleton stub class KnownNettySDD implements SDD { + ProducerResourceId, ProducerConnection, etc } // later: class AnyOtherSDD implements SDD { other specific shuffle identification } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r256595763 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ## @@ -52,30 +52,38 @@ /** The ID of the partition the input channel is going to consume. */ private final ResultPartitionID consumedPartitionId; - /** The location of the partition the input channel is going to consume. */ - private final ResultPartitionLocation consumedPartitionLocation; + /** The location type of the partition the input channel is going to consume. */ + private final LocationType locationType; + + /** The connection to use to request the remote partition. */ + private final Optional connectionId; Review comment: ok, I think I see the problem now, thanks for explanation. I will put my thoughts in other order :) 2. During the design, I thought `ShuffleDeploymentDescriptor` was supposed to contain shuffle specific info generated by `ShuffleMaster` as a central point and used eventually by `ShuffleService` in producer and consumer Task to setup readers/writers. The example could be some partition identification or connection inside external shuffle system. The existing connection id/location is also an example of it for the existing netty stack, but might be not relevant for other shuffle systems. For example, let's say the partition is stored remotely (not in producer), the batch job is restored and some the partition is finished, we do not even need to deploy the producer, just connect the consumer to the existing 'done' external partition, then the existing connection id does not make sense, the consumer needs some kind of internal shuffle id of the partition. That is why I thought: PSD(ProducerResourceId,ProducerConnection,...) -> `ShuffleMaster` -> SDD(Internal) -> ICDD(SDD) -> Task -> ICDD,ConsumerResourceId -> `ShuffleService` -> InputGate -> read records. I think even `ShuffleService` itself can decide what to do with ProducerResourceId/ConsumerResourceId and calculate internally LocationType in case of existing netty. For other shuffle services, LocationType might be not relevant (like external partition), then maybe ICDD=SDD=PartitionInfo and we could leave only one of them, not sure. I thought of `UnknownShuffleDeploymentDescriptor` as a replacement of `LocationType.Unknown\ConnectionId=null` based on the above arguments. It is just a singleton stub to signal that SDD will be updated later with the `sendUpdatePartitionInfoRpcCall` in case of lazy scheduling. True, it is not generated by `ShuffleMaster`, what could be an alternative for this approach? 1. In case of eager deployment (lazyScheduling=false), currently, we can already deploy the consumer when the slot is assigned to the producer but its deployment has not started yet and we planned to generate the SDD during producer deployment. If we agree on 2., it seems that we need SDD for consumer to consume and it has to be known. Thinking more about `ShuffleMaster` interface, depending on its nature, it might be an asynchronous API like registering and talking to an external system. This means that ideally its partition register method should return a `CompletableFuture`. Then the producer execution life cycle should be: created -> scheduled -> slot assigned -> register partition (get and cache SDD) -> deploying (generate TDD with previously acquired SDD). Everything happening on the main thread of Job Master. The consumer has to be deployed not after producer slot is assigned but after partition is registered in eager scheduling. In lazy scheduling, we have the `sendUpdatePartitionInfoRpcCall` to send SDD later. I would suggest we do the partitions registering and SDD caching in `allocateAndAssignSlotForExecution`, right after slot assignment: ``` return logicalSlotFuture .thenApply(..tryAssignResource..) .thenCompose(..ShuffleMaster.register(PSD), cache SDDs..); ``` Just maybe with refactoring the steps into different functions :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor URL: https://github.com/apache/flink/pull/7631#discussion_r256595763 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ## @@ -52,30 +52,38 @@ /** The ID of the partition the input channel is going to consume. */ private final ResultPartitionID consumedPartitionId; - /** The location of the partition the input channel is going to consume. */ - private final ResultPartitionLocation consumedPartitionLocation; + /** The location type of the partition the input channel is going to consume. */ + private final LocationType locationType; + + /** The connection to use to request the remote partition. */ + private final Optional connectionId; Review comment: ok, I think I see the problem now, thanks for explanation. I will put my thoughts in other order :) 2. During the design, I thought `ShuffleDeploymentDescriptor` was supposed to contain shuffle specific info generated by `ShuffleMaster` as a central point and used eventually by `ShuffleService` in producer and consumer Task to setup readers/writers. The example could be some partition identification or connection inside external shuffle system. The existing connection id/location is also an example of it for the existing netty stack, but might be not relevant for other shuffle systems. For example, let's say the partition is stored remotely (not in producer), the batch job is restored and some the partition is finished, we do not even need to deploy the producer, just connect the consumer to the existing 'done' external partition, then the existing connection id does not make sense, the consumer needs some kind of internal shuffle id of the partition. That is why I thought: PSD(ProducerResourceId,ProducerConnection,...) -> `ShuffleMaster` -> SDD(Internal) -> ICDD(SDD) -> Task -> ICDD,ConsumerResourceId -> `ShuffleService` -> InputGate -> read records. I think even `ShuffleService` itself can decide what to do with ProducerResourceId/ConsumerResourceId and calculate internally LocationType in case of existing netty. For other shuffle services, LocationType might be not relevant (like external partition), then maybe ICDD=SDD=PartitionInfo and we could leave only one of them, not sure. I thought of `UnknownShuffleDeploymentDescriptor` as a replacement of `LocationType.Unknown\ConnectionId=null` based on the above arguments. It is just a singleton stub to signal that SDD will be updated later with the `sendUpdatePartitionInfoRpcCall` in case of lazy scheduling. True, it is not generated by `ShuffleMaster`, what could be an alternative for this approach? 1. In case of eager deployment (lazyScheduling=false), currently, we can already deploy the consumer when the slot is assigned to the producer but its deployment has not started yet and we planned to generate the SDD during producer deployment. If we agree on 2., it seems that we need SDD for consumer to consume and it has to be known. Thinking more about `ShuffleMaster` interface, depending on its nature, it might be an asynchronous API like registering and talking to an external system. This means that ideally its partition register method should return a `CompletableFuture`. Then the producer execution life cycle should be: created -> scheduled -> slot assigned -> register partition (get and cache SDD) -> deploying (generate TDD with previously acquired SDD). Everything happening on the main thread of Job Master. The consumer has to be deployed not after producer slot is assigned but after partition is registered in eager scheduling. In lazy scheduling, we have the `sendUpdatePartitionInfoRpcCall` to send SDD later. I would suggest we do the partitions registering and SDD caching in `allocateAndAssignSlotForExecution`, right after slot assignment: ``` return logicalSlotFuture .thenApply(..tryAssignResource..) .thenCompose(..ShuffleMaster.register(PSD), cache SDDs..); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base
flinkbot commented on issue #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base URL: https://github.com/apache/flink/pull/7697#issuecomment-463356048 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL opened a new pull request #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base
GJL opened a new pull request #7697: [FLINK-11578][tests] Port BackPressureStatsTrackerImplITCase to new code base URL: https://github.com/apache/flink/pull/7697 ## What is the purpose of the change *Port BackPressureStatsTrackerImplITCase to new code base* ## Brief change log - *See commits* ## Verifying this change This change added tests and can be verified as follows: - *This change is a test.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7696: [FLINK-11588][core] Migrating the CopyableValueSerializer
flinkbot commented on issue #7696: [FLINK-11588][core] Migrating the CopyableValueSerializer URL: https://github.com/apache/flink/pull/7696#issuecomment-463303562 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The change fits into the overall [architecture]. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11588) Migrate CopyableValueSerializer to use new serialization compatibility abstractions
[ https://issues.apache.org/jira/browse/FLINK-11588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11588: --- Labels: pull-request-available (was: ) > Migrate CopyableValueSerializer to use new serialization compatibility > abstractions > --- > > Key: FLINK-11588 > URL: https://issues.apache.org/jira/browse/FLINK-11588 > Project: Flink > Issue Type: Sub-task > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > This subtask covers migration of the {{CopyableValueSerializer}} to use the > new serialization compatibility APIs {{TypeSerializerSnapshot}} and > {{TypeSerializerSchemaCompatibility}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)