[jira] [Updated] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
[ https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-7928: Labels: flip-6 (was: ) > Extend the filed in ResourceProfile for precisely calculating the resource of > a task manager > > > Key: FLINK-7928 > URL: https://issues.apache.org/jira/browse/FLINK-7928 > Project: Flink > Issue Type: Improvement > Components: JobManager, ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > ResourceProfile records all the resource requirements for a slot。It is > generated by JobMaster and then passed to ResourceManager with the slot > request. > A task in the slot needs three parts of resource: > 1. The resource for the operators, this is specified by the ResourceSpec user > defined > 2. The resource for the operators to communicating with their upstreams. For > example, the resource for buffer pools and so on. > 3. The resource for the operators to communicating with their downstreams. > Same as above. > So ResourceProfile should contain three parts of resource, the first part > from ResouceSpec, and the other two part be generated by Job Master. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
shuai.xu created FLINK-7928: --- Summary: Extend the filed in ResourceProfile for precisely calculating the resource of a task manager Key: FLINK-7928 URL: https://issues.apache.org/jira/browse/FLINK-7928 Project: Flink Issue Type: Improvement Components: JobManager, ResourceManager Reporter: shuai.xu Assignee: shuai.xu ResourceProfile records all the resource requirements for a slot。It is generated by JobMaster and then passed to ResourceManager with the slot request. A task in the slot needs three parts of resource: 1. The resource for the operators, this is specified by the ResourceSpec user defined 2. The resource for the operators to communicating with their upstreams. For example, the resource for buffer pools and so on. 3. The resource for the operators to communicating with their downstreams. Same as above. So ResourceProfile should contain three parts of resource, the first part from ResouceSpec, and the other two part be generated by Job Master. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7884) Port CheckpointConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 closed FLINK-7884. - Resolution: Fixed > Port CheckpointConfigHandler to new REST endpoint > - > > Key: FLINK-7884 > URL: https://issues.apache.org/jira/browse/FLINK-7884 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > Port *CheckpointConfigHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7893) Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 closed FLINK-7893. - Resolution: Fixed > Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint > --- > > Key: FLINK-7893 > URL: https://issues.apache.org/jira/browse/FLINK-7893 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST >Reporter: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > Port *CheckpointStatsDetailsSubtasksHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7885) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 closed FLINK-7885. - Resolution: Fixed > Port CheckpointStatsDetailsHandler to new REST endpoint > --- > > Key: FLINK-7885 > URL: https://issues.apache.org/jira/browse/FLINK-7885 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > Port *CheckpointStatsDetailsHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7892) Port CheckpointStatsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 closed FLINK-7892. - Resolution: Fixed > Port CheckpointStatsHandler to new REST endpoint > > > Key: FLINK-7892 > URL: https://issues.apache.org/jira/browse/FLINK-7892 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > Port *CheckpointStatsHandler* to new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7900) Add a Rich KeySelector
[ https://issues.apache.org/jira/browse/FLINK-7900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou UTC+8 closed FLINK-7900. - Resolution: Won't Fix > Add a Rich KeySelector > -- > > Key: FLINK-7900 > URL: https://issues.apache.org/jira/browse/FLINK-7900 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Critical > > Currently, we just have a `KeySelector` Function, maybe we should add a > `RichKeySelector` RichFunction, for the user to read some configuration > information to build the keySelector they need. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks
[ https://issues.apache.org/jira/browse/FLINK-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219878#comment-16219878 ] ASF GitHub Bot commented on FLINK-7666: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4900 I have one quick question, but maybe I miss something big here: wouldn't it be possible to simply run `quiesceAndAwaitPending()` before `Operator::close()` and avoid the locking/flag checking and not run into exceptions? If we ensure that the task is no longer in running state, the "forgotten" timers can no longer reflect in any checkpoint/savepoint and I think there is no guarantee of a consistent output after canceling a job anyways? > ContinuousFileReaderOperator swallows chained watermarks > > > Key: FLINK-7666 > URL: https://issues.apache.org/jira/browse/FLINK-7666 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.3.2 >Reporter: Ufuk Celebi >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > I use event time and read from a (finite) file. I assign watermarks right > after the {{ContinuousFileReaderOperator}} with parallelism 1. > {code} > env > .readFile(new TextInputFormat(...), ...) > .setParallelism(1) > .assignTimestampsAndWatermarks(...) > .setParallelism(1) > .map()... > {code} > The watermarks I assign never progress through the pipeline. > I can work around this by inserting a {{shuffle()}} after the file reader or > starting a new chain at the assigner: > {code} > env > .readFile(new TextInputFormat(...), ...) > .setParallelism(1) > .shuffle() > .assignTimestampsAndWatermarks(...) > .setParallelism(1) > .map()... > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4900 I have one quick question, but maybe I miss something big here: wouldn't it be possible to simply run `quiesceAndAwaitPending()` before `Operator::close()` and avoid the locking/flag checking and not run into exceptions? If we ensure that the task is no longer in running state, the "forgotten" timers can no longer reflect in any checkpoint/savepoint and I think there is no guarantee of a consistent output after canceling a job anyways? ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219854#comment-16219854 ] ASF GitHub Bot commented on FLINK-5823: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147031787 --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java --- @@ -243,11 +245,19 @@ else if (directory.exists()) { * @throws IOException if the delete operation fails */ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException { - FileStatus[] fileStatuses = null; + final FileStatus[] fileStatuses; try { fileStatuses = fileSystem.listStatus(path); - } catch (Exception ignored) {} + } + catch (FileNotFoundException e) { + // path already deleted + return true; + } + catch (Exception e) { + // could not access directory, cannot delete --- End diff -- should we log a warning here? > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r147031787 --- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java --- @@ -243,11 +245,19 @@ else if (directory.exists()) { * @throws IOException if the delete operation fails */ public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException { - FileStatus[] fileStatuses = null; + final FileStatus[] fileStatuses; try { fileStatuses = fileSystem.listStatus(path); - } catch (Exception ignored) {} + } + catch (FileNotFoundException e) { + // path already deleted + return true; + } + catch (Exception e) { + // could not access directory, cannot delete --- End diff -- should we log a warning here? ---
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219771#comment-16219771 ] Shuyi Chen commented on FLINK-7923: --- We need to fix Calcite first probably in release 1.15, and upgrade the Calcite dependency in Flink, then fix in Flink. > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146993120 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java --- @@ -48,75 +53,249 @@ private final ClassLoader cl = getClass().getClassLoader(); - private final String backendKey = CoreOptions.STATE_BACKEND.key(); + private final String backendKey = CheckpointingOptions.STATE_BACKEND.key(); // + // defaults + // @Test public void testNoStateBackendDefined() throws Exception { - assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null)); + assertNull(StateBackendLoader.loadStateBackendFromConfig(new Configuration(), cl, null)); } @Test public void testInstantiateMemoryBackendByDefault() throws Exception { - StateBackend backend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + StateBackend backend = + StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), cl, null); assertTrue(backend instanceof MemoryStateBackend); } @Test - public void testLoadMemoryStateBackend() throws Exception { - // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) - // to guard against config-breaking changes of the name + public void testApplicationDefinedHasPrecedence() throws Exception { + final StateBackend appBackend = Mockito.mock(StateBackend.class); + final Configuration config = new Configuration(); config.setString(backendKey, "jobmanager"); - StateBackend backend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + StateBackend backend = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, null); + assertEquals(appBackend, backend); + } - assertTrue(backend instanceof MemoryStateBackend); + // + // Memory State Backend + // + + /** +* Validates loading a memory state backend from the cluster configuration. +*/ + @Test + public void testLoadMemoryStateBackendNoParameters() throws Exception { + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "jobmanager"); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey, MemoryStateBackendFactory.class.getName()); + + StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); + StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); + + assertTrue(backend1 instanceof MemoryStateBackend); + assertTrue(backend2 instanceof MemoryStateBackend); + } + + /** +* Validates loading a memory state backend with additional parameters from the cluster configuration. +*/ + @Test + public void testLoadMemoryStateWithParameters() throws Exception { + final String checkpointDir = new Path(tmp.newFolder().toURI()).toString(); + final String savepointDir = new Path(tmp.newFolder().toURI()).toString(); + final Path expectedCheckpointPath = new Path(checkpointDir); + final Path expectedSavepointPath = new Path(savepointDir); + + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "jobmanager"); + config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); + config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey,
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219561#comment-16219561 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146988381 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +/** + *An interface for state backends that pick up additional parameters from a configuration. + */ +public interface ConfigurableStateBackend { + + /** +* Creates a variant of the state backend that applies additional configuration parameters. +* +* Settings that were directly done on the original state backend object in the application +* program typically have precedence over setting picked up from the configuration. +* +* If no configuration is applied, or if the method directly applies configuration values to +* the (mutable) state backend object, this method may return the original state backend object. +* Otherwise it typically returns a modified copy. +* +* @param config The configuration to pick the values from. +* @return A copy of th --- End diff -- Sentence is trailing off. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219564#comment-16219564 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146985691 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Off-topic: The fact that we have "FsStateBackend" and "MemoryStateBackend" is confusing every user. We should only have "HeapStateBackend" and "RocksDBStateBackend", which both checkpoint to a DFS. (And the current "MemoryStateBackend" behaviour could be a switch on "HeapStateBackend"). But I'm afraid it's too late for that. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146990927 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the --- End diff -- Nit: is this true for `MemoryStateBackend`? At the very least it's weird. ð ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219562#comment-16219562 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146987131 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java --- @@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph( metrics); // The default directory for externalized checkpoints - String externalizedCheckpointsDir = jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY); + String externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY); - // load the state backend for checkpoint metadata. - // if specified in the application, use from there, otherwise load from configuration - final StateBackend metadataBackend; + // load the state backend from the application settings + final StateBackend applicationConfiguredBackend; + final SerializedValue serializedAppConfigured = snapshotSettings.getDefaultStateBackend(); - final SerializedValue applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); - if (applicationConfiguredBackend != null) { + if (serializedAppConfigured == null) { + applicationConfiguredBackend = null; + } + else { try { - metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader); + applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader); } catch (IOException | ClassNotFoundException e) { - throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e); + throw new JobExecutionException(jobId, + "Could not deserialize application-defined state backend.", e); } + } - log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.", - metadataBackend); - } else { - try { - metadataBackend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log); - } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) { - throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e); - } + final StateBackend rootBackend; + try { + rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault( --- End diff -- nit: This is only defined in the later commit "[FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration", so technically the commit that adds this code is broken. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146991686 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -18,54 +18,92 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.IOException; import java.net.URI; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The file state backend is a state backend that stores the state of streaming jobs in a file system. + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state as files to a file system (hence the backend's name). + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/chk-17/}. + * + * State Size Considerations + * + * Working state is kept on the TaskManager heap. If a TaskManager executes multiple + * tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) + * then the aggregate state of all tasks needs to fit into that TaskManager's memory. + * + * This state backend stores small state chunks directly with the metadata, to avoid creating + * many small files. The threshold for that is configurable. When increasing this threshold, the + * size of the checkpoint metadata increases. The checkpoint metadata of all retained completed + * checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem, + * unless the threshold {@link #getMinFileSizeThreshold()} is increased significantly. + * + * Persistence Guarantees * - * The state backend has one core directory into which it puts all checkpoint data. Inside that - * directory, it creates a directory per job, inside which each checkpoint gets a directory, with - * files for each state, for example: + * Checkpoints from this state backend are as persistent and available as filesystem that is written to. + * If the file system is a persistent distributed file system, this state backend supports + * highly available setups. The backend additionally supports savepoints and externalized checkpoints. * - * {@code hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 } + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savepoint directory specified in the + * Flink configuration of the running job/cluster. That behavior is implemented via the + * {@link #configure(Configuration)} method. */ -public class FsStateBackend extends AbstractStateBackend { +@PublicEvolving +public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend { --- End diff -- Why isn't `AbstractFileStateBackend` configurable? ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146992430 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java --- @@ -18,45 +18,41 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackendFactory; -import java.io.IOException; +import java.net.URI; /** - * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} - * from a configuration. + * A factory that creates an {@link FsStateBackend} from a configuration. */ +@PublicEvolving public class FsStateBackendFactory implements StateBackendFactory { - - /** The key under which the config stores the directory where checkpoints should be stored */ - public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir"; - - /** The key under which the config stores the threshold for state to be store in memory, -* rather than in files */ - public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold"; - @Override public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException { --- End diff -- Wouldn't it make sense to do all of this in `configure()`? The factories are essentially useless now and would only instantiate the backend and then call `configure()` with the config. ---
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219568#comment-16219568 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146992430 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java --- @@ -18,45 +18,41 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackendFactory; -import java.io.IOException; +import java.net.URI; /** - * A factory that creates an {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} - * from a configuration. + * A factory that creates an {@link FsStateBackend} from a configuration. */ +@PublicEvolving public class FsStateBackendFactory implements StateBackendFactory { - - /** The key under which the config stores the directory where checkpoints should be stored */ - public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.fs.checkpointdir"; - - /** The key under which the config stores the threshold for state to be store in memory, -* rather than in files */ - public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold"; - @Override public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException { --- End diff -- Wouldn't it make sense to do all of this in `configure()`? The factories are essentially useless now and would only instantiate the backend and then call `configure()` with the config. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219563#comment-16219563 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146984727 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ --- End diff -- nit: is this true for `MemoryStateBackend`? Same for the options below. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219565#comment-16219565 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146991686 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -18,54 +18,92 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.IOException; import java.net.URI; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The file state backend is a state backend that stores the state of streaming jobs in a file system. + * This state backend holds the working state in the memory (JVM heap) of the TaskManagers. + * The state backend checkpoints state as files to a file system (hence the backend's name). + * + * Each checkpoint individually will store all its files in a subdirectory that includes the + * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/chk-17/}. + * + * State Size Considerations + * + * Working state is kept on the TaskManager heap. If a TaskManager executes multiple + * tasks concurrently (if the TaskManager has multiple slots, or if slot-sharing is used) + * then the aggregate state of all tasks needs to fit into that TaskManager's memory. + * + * This state backend stores small state chunks directly with the metadata, to avoid creating + * many small files. The threshold for that is configurable. When increasing this threshold, the + * size of the checkpoint metadata increases. The checkpoint metadata of all retained completed + * checkpoints needs to fit into the JobManager's heap memory. This is typically not a problem, + * unless the threshold {@link #getMinFileSizeThreshold()} is increased significantly. + * + * Persistence Guarantees * - * The state backend has one core directory into which it puts all checkpoint data. Inside that - * directory, it creates a directory per job, inside which each checkpoint gets a directory, with - * files for each state, for example: + * Checkpoints from this state backend are as persistent and available as filesystem that is written to. + * If the file system is a persistent distributed file system, this state backend supports + * highly available setups. The backend additionally supports savepoints and externalized checkpoints. * - * {@code hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 } + * Configuration + * + * As for all state backends, this backend can either be configured within the application (by creating + * the backend with the respective constructor parameters and setting it on the execution environment) + * or by specifying it in the Flink configuration. + * + * If the state backend was specified in the application, it may pick up additional configuration + * parameters from the Flink configuration. For example, if the backend if configured in the application + * without a default savepoint directory, it will pick up a default savepoint directory specified in the + * Flink configuration of the running job/cluster. That behavior is implemented via the + * {@link #configure(Configuration)} method. */ -public class FsStateBackend extends AbstractStateBackend { +@PublicEvolving +public class FsStateBackend extends AbstractFileStateBackend implements
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219560#comment-16219560 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146985767 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Mentioning it because we have no "heap" backend, technically. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219566#comment-16219566 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146990927 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.net.URI; + +/** + * A base class for all state backends that store their metadata (and data) in files. + * Examples that inherit from this are the {@link FsStateBackend}, the + * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend MemoryStateBackend}, or the --- End diff -- Nit: is this true for `MemoryStateBackend`? At the very least it's weird. > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219567#comment-16219567 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146993120 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java --- @@ -48,75 +53,249 @@ private final ClassLoader cl = getClass().getClassLoader(); - private final String backendKey = CoreOptions.STATE_BACKEND.key(); + private final String backendKey = CheckpointingOptions.STATE_BACKEND.key(); // + // defaults + // @Test public void testNoStateBackendDefined() throws Exception { - assertNull(AbstractStateBackend.loadStateBackendFromConfig(new Configuration(), cl, null)); + assertNull(StateBackendLoader.loadStateBackendFromConfig(new Configuration(), cl, null)); } @Test public void testInstantiateMemoryBackendByDefault() throws Exception { - StateBackend backend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + StateBackend backend = + StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), cl, null); assertTrue(backend instanceof MemoryStateBackend); } @Test - public void testLoadMemoryStateBackend() throws Exception { - // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) - // to guard against config-breaking changes of the name + public void testApplicationDefinedHasPrecedence() throws Exception { + final StateBackend appBackend = Mockito.mock(StateBackend.class); + final Configuration config = new Configuration(); config.setString(backendKey, "jobmanager"); - StateBackend backend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(new Configuration(), cl, null); + StateBackend backend = StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, null); + assertEquals(appBackend, backend); + } - assertTrue(backend instanceof MemoryStateBackend); + // + // Memory State Backend + // + + /** +* Validates loading a memory state backend from the cluster configuration. +*/ + @Test + public void testLoadMemoryStateBackendNoParameters() throws Exception { + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "jobmanager"); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey, MemoryStateBackendFactory.class.getName()); + + StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); + StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); + + assertTrue(backend1 instanceof MemoryStateBackend); + assertTrue(backend2 instanceof MemoryStateBackend); + } + + /** +* Validates loading a memory state backend with additional parameters from the cluster configuration. +*/ + @Test + public void testLoadMemoryStateWithParameters() throws Exception { + final String checkpointDir = new Path(tmp.newFolder().toURI()).toString(); + final String savepointDir = new Path(tmp.newFolder().toURI()).toString(); + final Path expectedCheckpointPath = new Path(checkpointDir); + final Path expectedSavepointPath = new Path(savepointDir); + + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "jobmanager"); +
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219559#comment-16219559 ] ASF GitHub Bot commented on FLINK-5823: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146987476 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -116,6 +117,10 @@ * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; + /** The root checkpoint state backend, which is responsible for initializing the +* checkpoint, storing the metadata, and cleaning up the checkpoint */ + private final StateBackend checkpointStateBackend; --- End diff -- nit: Are there other backends than "checkpoint" backend? > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146985691 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Off-topic: The fact that we have "FsStateBackend" and "MemoryStateBackend" is confusing every user. We should only have "HeapStateBackend" and "RocksDBStateBackend", which both checkpoint to a DFS. (And the current "MemoryStateBackend" behaviour could be a switch on "HeapStateBackend"). But I'm afraid it's too late for that. ð© ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146987131 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java --- @@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph( metrics); // The default directory for externalized checkpoints - String externalizedCheckpointsDir = jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY); + String externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY); - // load the state backend for checkpoint metadata. - // if specified in the application, use from there, otherwise load from configuration - final StateBackend metadataBackend; + // load the state backend from the application settings + final StateBackend applicationConfiguredBackend; + final SerializedValue serializedAppConfigured = snapshotSettings.getDefaultStateBackend(); - final SerializedValue applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend(); - if (applicationConfiguredBackend != null) { + if (serializedAppConfigured == null) { + applicationConfiguredBackend = null; + } + else { try { - metadataBackend = applicationConfiguredBackend.deserializeValue(classLoader); + applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader); } catch (IOException | ClassNotFoundException e) { - throw new JobExecutionException(jobId, "Could not instantiate configured state backend.", e); + throw new JobExecutionException(jobId, + "Could not deserialize application-defined state backend.", e); } + } - log.info("Using application-defined state backend for checkpoint/savepoint metadata: {}.", - metadataBackend); - } else { - try { - metadataBackend = AbstractStateBackend - .loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log); - } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) { - throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e); - } + final StateBackend rootBackend; + try { + rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault( --- End diff -- nit: This is only defined in the later commit "[FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration", so technically the commit that adds this code is broken. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146984727 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ --- End diff -- nit: is this true for `MemoryStateBackend`? Same for the options below. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146987476 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -116,6 +117,10 @@ * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; + /** The root checkpoint state backend, which is responsible for initializing the +* checkpoint, storing the metadata, and cleaning up the checkpoint */ + private final StateBackend checkpointStateBackend; --- End diff -- nit: Are there other backends than "checkpoint" backend? ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146988381 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +/** + *An interface for state backends that pick up additional parameters from a configuration. + */ +public interface ConfigurableStateBackend { + + /** +* Creates a variant of the state backend that applies additional configuration parameters. +* +* Settings that were directly done on the original state backend object in the application +* program typically have precedence over setting picked up from the configuration. +* +* If no configuration is applied, or if the method directly applies configuration values to +* the (mutable) state backend object, this method may return the original state backend object. +* Otherwise it typically returns a modified copy. +* +* @param config The configuration to pick the values from. +* @return A copy of th --- End diff -- Sentence is trailing off. ---
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4907#discussion_r146985767 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +/** + * A collection of all configuration options that relate to checkpoints + * and savepoints. + */ +public class CheckpointingOptions { + + // + // general checkpoint and state backend options + // + + public static final ConfigOption STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); + + /** The maximum number of completed checkpoint instances to retain.*/ + public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions + .key("state.checkpoints.num-retained") + .defaultValue(1); + + // + // Options specific to the file-system-based state backends + // + + /** The default directory for savepoints. Used by the state backends that write +* savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions + .key("state.savepoints.dir") + .noDefaultValue() + .withDeprecatedKeys("savepoints.state.backend.fs.dir"); + + /** The default directory used for checkpoints. Used by the state backends that write +* checkpoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend). */ + public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions + .key("state.checkpoints.dir") + .noDefaultValue(); + + /** Option whether the heap-based key/value data structures should use an asynchronous +* snapshot method. Used by MemoryStateBackend and FsStateBackend. */ + public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = ConfigOptions + .key("state.backend.heap.async") --- End diff -- Mentioning it because we have no "heap" backend, technically. ---
[GitHub] flink pull request #4827: [FLINK-7840] [build] Shade netty in akka
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4827#discussion_r146986377 --- Diff: flink-runtime/pom.xml --- @@ -427,17 +427,42 @@ under the License. shade + + true --- End diff -- Why exactly do we need this? AFAIK you can shade transitive dependencies without promoting them. ---
[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency
[ https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219519#comment-16219519 ] ASF GitHub Bot commented on FLINK-7840: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4827#discussion_r146986377 --- Diff: flink-runtime/pom.xml --- @@ -427,17 +427,42 @@ under the License. shade + + true --- End diff -- Why exactly do we need this? AFAIK you can shade transitive dependencies without promoting them. > Shade Akka's Netty Dependency > - > > Key: FLINK-7840 > URL: https://issues.apache.org/jira/browse/FLINK-7840 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > In order to avoid clashes between different Netty versions we should shade > Akka's Netty away. > These dependency version clashed manifest themselves in very subtle ways, > like occasional deadlocks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency
[ https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219511#comment-16219511 ] ASF GitHub Bot commented on FLINK-7840: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4827#discussion_r146985560 --- Diff: flink-test-utils-parent/flink-test-utils/pom.xml --- @@ -117,6 +124,41 @@ under the License. true true + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + io.netty:netty + + + + + org.jboss.netty + org.apache.flink.shaded.testutils.org.jboss.netty --- End diff -- Could you explain more why we need to shade this here? are we hiding some transitive netty dependency? > Shade Akka's Netty Dependency > - > > Key: FLINK-7840 > URL: https://issues.apache.org/jira/browse/FLINK-7840 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > In order to avoid clashes between different Netty versions we should shade > Akka's Netty away. > These dependency version clashed manifest themselves in very subtle ways, > like occasional deadlocks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4827: [FLINK-7840] [build] Shade netty in akka
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4827#discussion_r146985560 --- Diff: flink-test-utils-parent/flink-test-utils/pom.xml --- @@ -117,6 +124,41 @@ under the License. true true + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + io.netty:netty + + + + + org.jboss.netty + org.apache.flink.shaded.testutils.org.jboss.netty --- End diff -- Could you explain more why we need to shade this here? are we hiding some transitive netty dependency? ---
[jira] [Updated] (FLINK-7877) Fix compilation against the Hadoop 3 beta1 release
[ https://issues.apache.org/jira/browse/FLINK-7877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7877: -- Labels: build (was: ) > Fix compilation against the Hadoop 3 beta1 release > -- > > Key: FLINK-7877 > URL: https://issues.apache.org/jira/browse/FLINK-7877 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > Labels: build > > When compiling against hadoop 3.0.0-beta1, I got: > {code} > [ERROR] > /mnt/disk2/a/flink/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java:[224,16] > org.apache.flink.yarn.UtilsTest.TestingContainer is not abstract and does > not override abstract method > setExecutionType(org.apache.hadoop.yarn.api.records.ExecutionType) in > org.apache.hadoop.yarn.api.records.Container > {code} > There may other hadoop API(s) that need adjustment. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7927) Different Netty Versions in dependencies of flink-runtime make it impossible to use 3rd party libraries using netty
Claudius Eisele created FLINK-7927: -- Summary: Different Netty Versions in dependencies of flink-runtime make it impossible to use 3rd party libraries using netty Key: FLINK-7927 URL: https://issues.apache.org/jira/browse/FLINK-7927 Project: Flink Issue Type: Bug Affects Versions: 1.3.2 Environment: * Windows 10 x64 * Java 1.8 Reporter: Claudius Eisele I am trying to use Google PubSub (google-cloud-pubsub 0.26.0-beta) in a Flink streaming job but I am receiving the following error when executing it so unfortunately it's not possible to use PubSub in a Flink Streaming Job: {code:java} ... 10/25/2017 22:38:02 Source: Custom Source -> Map(1/1) switched to RUNNING 10/25/2017 22:38:03 Source: Custom Source -> Map(1/1) switched to FAILED java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be RUNNING, but the service has FAILED at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:328) at com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266) at com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97) Caused by: java.lang.IllegalArgumentException: Jetty ALPN/NPN has not been properly configured. at io.grpc.netty.GrpcSslContexts.selectApplicationProtocolConfig(GrpcSslContexts.java:159) at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:136) at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:124) at io.grpc.netty.GrpcSslContexts.forClient(GrpcSslContexts.java:94) at io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:525) at io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:518) at io.grpc.netty.NettyChannelBuilder$NettyTransportFactory.(NettyChannelBuilder.java:457) at io.grpc.netty.NettyChannelBuilder.buildTransportFactory(NettyChannelBuilder.java:326) at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:315) at com.google.api.gax.grpc.InstantiatingChannelProvider.createChannel(InstantiatingChannelProvider.java:131) at com.google.api.gax.grpc.InstantiatingChannelProvider.getChannel(InstantiatingChannelProvider.java:116) at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:246) at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:149) at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:211) at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:121) at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:235) ... 7 more {code} I reported this problem to the Google Cloud Java Library but the problem seems more to be in Flink or its dependencies like akka because there are a lot of netty dependencies with different versions in it: * Apache Zookeeper (flink-runtime dependency) has \--- io.netty:netty:3.7.0.Final -> 3.8.0.Final * Flakka (flink-runtime dependency) has io.netty:netty:3.8.0.Final * Flink-Runtime has io.netty:netty-all:4.0.27.Final In my case, Google Cloud PubSub has io.grpc:grpc-netty:1.6.1 Additional information on the issue in combination with Google Cloud PubSub can be found here: https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2398 https://github.com/grpc/grpc-java/issues/3025 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-6402) Consider removing annotation for REAPER_THREAD_LOCK in SafetyNetCloseableRegistry#doRegister()
[ https://issues.apache.org/jira/browse/FLINK-6402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-6402. --- Resolution: Later > Consider removing annotation for REAPER_THREAD_LOCK in > SafetyNetCloseableRegistry#doRegister() > -- > > Key: FLINK-6402 > URL: https://issues.apache.org/jira/browse/FLINK-6402 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Ted Yu >Priority: Minor > > Here is related code: > {code} > PhantomDelegatingCloseableRef phantomRef = new > PhantomDelegatingCloseableRef( > wrappingProxyCloseable, > this, > REAPER_THREAD.referenceQueue); > {code} > Instantiation of REAPER_THREAD can be made visible by ctor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
[ https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7775: -- Description: {code} public int getNumberOfCachedJobs() { return jobRefCounters.size(); } {code} The method is not used. We should remove it. was: {code} public int getNumberOfCachedJobs() { return jobRefCounters.size(); } {code} The method is not used. We should remove it. > Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs > --- > > Key: FLINK-7775 > URL: https://issues.apache.org/jira/browse/FLINK-7775 > Project: Flink > Issue Type: Task > Components: Local Runtime >Reporter: Ted Yu >Priority: Minor > > {code} > public int getNumberOfCachedJobs() { > return jobRefCounters.size(); > } > {code} > The method is not used. > We should remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7542) Some tests in AggregateITCase fail for some Time Zones
[ https://issues.apache.org/jira/browse/FLINK-7542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7542. Resolution: Fixed Fix Version/s: 1.3.3 1.4.0 Fixed for 1.3.2 with 0891a6f5accd02557e4f00c3c473db1a9a473585 Fixed for 1.4.0 with babee277204b6b1edcfe9c7c76348254019b2dd3 > Some tests in AggregateITCase fail for some Time Zones > -- > > Key: FLINK-7542 > URL: https://issues.apache.org/jira/browse/FLINK-7542 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Usman Younas >Assignee: Fabian Hueske > Fix For: 1.4.0, 1.3.3 > > > In {{org.apache.flink.table.runtime.batch.sql.AggregateITCase}} two tests > 1. testTumbleWindowAggregate and > 2. testHopWindowAggregate > are failing for some time zones. > Bug can be produced by changing the time zone of machine to > Time Zone: Central Daylight Time > Closest City: Houston-United States > I think, problem is with Timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6697) Add batch multi-window support
[ https://issues.apache.org/jira/browse/FLINK-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6697. Resolution: Implemented Implemented for 1.4.0 with babee277204b6b1edcfe9c7c76348254019b2dd3 > Add batch multi-window support > -- > > Key: FLINK-6697 > URL: https://issues.apache.org/jira/browse/FLINK-6697 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Timo Walther >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.4.0 > > > Multiple consecutive windows on batch are not tested yet and I think they are > also not supported, because the syntax is not defined for batch yet. > The following should be supported: > {code} > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6584) Support multiple consecutive windows in SQL
[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6584. Resolution: Implemented Implemented for 1.4.0 with 2ad8f7eb5690755e2a9e7e93181cd34e1093f23a > Support multiple consecutive windows in SQL > --- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.4.0 > > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} > Similar behavior should be supported by the SQL API as well. We need to > introduce a new auxiliary group function, but this should happen in sync with > Apache Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6697) Add batch multi-window support
[ https://issues.apache.org/jira/browse/FLINK-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219429#comment-16219429 ] ASF GitHub Bot commented on FLINK-6697: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4796 > Add batch multi-window support > -- > > Key: FLINK-6697 > URL: https://issues.apache.org/jira/browse/FLINK-6697 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Timo Walther >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.4.0 > > > Multiple consecutive windows on batch are not tested yet and I think they are > also not supported, because the syntax is not defined for batch yet. > The following should be supported: > {code} > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4199 ---
[GitHub] flink pull request #4796: [FLINK-6697] [table] Add support for window.rowtim...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4796 ---
[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL
[ https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219428#comment-16219428 ] ASF GitHub Bot commented on FLINK-6584: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4199 > Support multiple consecutive windows in SQL > --- > > Key: FLINK-6584 > URL: https://issues.apache.org/jira/browse/FLINK-6584 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.4.0 > > > Right now, the Table API supports multiple consecutive windows as follows: > {code} > val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, > 'bigdec, 'string) > val t = table > .window(Tumble over 2.millis on 'rowtime as 'w) > .groupBy('w) > .select('w.rowtime as 'rowtime, 'int.count as 'int) > .window(Tumble over 4.millis on 'rowtime as 'w2) > .groupBy('w2) > .select('w2.rowtime, 'w2.end, 'int.count) > {code} > Similar behavior should be supported by the SQL API as well. We need to > introduce a new auxiliary group function, but this should happen in sync with > Apache Calcite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7926) Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
David Dreyfus created FLINK-7926: Summary: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers. Key: FLINK-7926 URL: https://issues.apache.org/jira/browse/FLINK-7926 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.3.2 Environment: standalone execution on MacBook Pro in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3. taskmanager.heap.mb = 1024 taskmanager.memory.preallocate = false taskmanager.numberOfTaskSlots = 3 Reporter: David Dreyfus The following exception is thrown as the number of tasks increases. {code:java} 10/25/2017 14:26:16 LeftOuterJoin(Join at with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers. at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302) at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114) at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) 10/25/2017 14:26:16 Job execution switched to status FAILING. java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers. at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302) at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114) at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) {code} I run with the following command: {code:java} flink run -c com.northbay.union.Union3 ./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left /Users/user/Documents/Flink/Quickstart/Files/manysmall --right /Users/user/Documents/Flink/Quickstart/Files/manysmall --output /tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50 {code} The files submitted are all CSV (int, string, short) This is the code (break out into 3 separate files before using). The idea behind this test is to compare (hash-join) pairs of files and combine their results. {code:java} package com.northbay.hashcount; public class DeviceRecord1 { public int device; public String fingerprint; public short dma; public boolean match; public DeviceRecord1() { } public DeviceRecord1(DeviceRecord old) { this.device = old.device; this.fingerprint = old.fingerprint; this.dma = old.dma; this.match = false; } public DeviceRecord1(int device, String fingerprint, short dma) { this.device = device; this.fingerprint = fingerprint; this.dma = dma; this.match =
[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219294#comment-16219294 ] ASF GitHub Bot commented on FLINK-5823: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4907 [FLINK-5823] [checkpoints] State Backends also handle Checkpoint Metadata (part 1) This is an incremental (first part) rebuild of #3522 on the latest master. For ease of review, broken down into small chunks. ## Part 1: Application-defined State Backends pick up additional values from the configuration We need to keep supporting the scenario of setting a state backends in the user program, but configuring parameters like checkpoint directory in the cluster config. To support that, state backends may implement an additional interface which lets them pick up configuration values from the cluster configuration. This also makes testing the checkpoint / savepoint configuration much easier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink backend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4907.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4907 commit 8055cc9d84e206abd00cbc339c6cc390c53f2afe Author: Stephan EwenDate: 2017-10-24T17:46:07Z [hotfix] [hdfs] Avoid reparsing URIs in Hadoop File Status conversion commit 96068b93d4c332a9b8cfba06ecc9d42804a79f92 Author: Stephan Ewen Date: 2017-10-24T19:20:44Z [hotfix] [streaming] Move SerializedCheckpointData to proper scope for MessageAcknowledgingSourceBase commit e098cc6ac8d77a027aefb6c1bcee41f6214ed74c Author: Stephan Ewen Date: 2017-10-25T09:56:59Z [hotfix] [runtime] Minor optimization in CheckpointMetrics commit 8a6b78a8dacadb5476c6bf452fbdab352d0ab908 Author: Stephan Ewen Date: 2017-10-25T11:30:51Z [hotfix] [checkpoints] fix warnings and make minor improvements to CheckpointCoordinatorTest and SharedStateRegistry commit aa09d82fd57d2636f54ec2a56e457f19d8ec9490 Author: Stephan Ewen Date: 2017-10-25T11:46:34Z [hotfix] [core] Fix FileUtils.deletePathIfEmpty commit 15be2602d4d44e29e756cd558130f2303d82fe8b Author: Stephan Ewen Date: 2017-10-25T17:04:58Z [hotfix] [checkpoints] Remove incorrect 'Serializable' from StateBackendFactory commit 1438df297bd820c07787563ddcfd2fda8773387f Author: Stephan Ewen Date: 2017-10-25T12:16:37Z [FLINK-7924] [checkpoints] Fix incorrect names of checkpoint options Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when actually, the checkpoints may always be incremental and only savepoints have to be full and self contained. Initially, we planned to add options for multiple checkpoints, like checkpoints that were foreced to be full, and checkpoints that were incremental. That is not necessary at this point. commit 6b792bf72fdb0c17625b1fe97c52791d550a74e4 Author: Stephan Ewen Date: 2017-10-25T15:30:14Z [FLINK-7925] [checkpoints] Add CheckpointingOptions The CheckpointingOptions consolidate all checkpointing and state backend-related settings that were previously split across different classes. commit c4ce5522d8052ae8af8134bac4ea74bb5a929027 Author: Stephan Ewen Date: 2017-10-25T11:23:46Z [FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator commit 337e354bbe198ce6ab68d23cec93bc0c81bbbfaf Author: Stephan Ewen Date: 2017-10-25T15:32:17Z [hotfix] [core] Fix broken JavaDoc links in ConfigConstants commit 0522ac34146aecfad2dbd9c806e6f7be182d00fd Author: Stephan Ewen Date: 2017-10-25T17:04:10Z [FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > --- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/4907 [FLINK-5823] [checkpoints] State Backends also handle Checkpoint Metadata (part 1) This is an incremental (first part) rebuild of #3522 on the latest master. For ease of review, broken down into small chunks. ## Part 1: Application-defined State Backends pick up additional values from the configuration We need to keep supporting the scenario of setting a state backends in the user program, but configuring parameters like checkpoint directory in the cluster config. To support that, state backends may implement an additional interface which lets them pick up configuration values from the cluster configuration. This also makes testing the checkpoint / savepoint configuration much easier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink backend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4907.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4907 commit 8055cc9d84e206abd00cbc339c6cc390c53f2afe Author: Stephan EwenDate: 2017-10-24T17:46:07Z [hotfix] [hdfs] Avoid reparsing URIs in Hadoop File Status conversion commit 96068b93d4c332a9b8cfba06ecc9d42804a79f92 Author: Stephan Ewen Date: 2017-10-24T19:20:44Z [hotfix] [streaming] Move SerializedCheckpointData to proper scope for MessageAcknowledgingSourceBase commit e098cc6ac8d77a027aefb6c1bcee41f6214ed74c Author: Stephan Ewen Date: 2017-10-25T09:56:59Z [hotfix] [runtime] Minor optimization in CheckpointMetrics commit 8a6b78a8dacadb5476c6bf452fbdab352d0ab908 Author: Stephan Ewen Date: 2017-10-25T11:30:51Z [hotfix] [checkpoints] fix warnings and make minor improvements to CheckpointCoordinatorTest and SharedStateRegistry commit aa09d82fd57d2636f54ec2a56e457f19d8ec9490 Author: Stephan Ewen Date: 2017-10-25T11:46:34Z [hotfix] [core] Fix FileUtils.deletePathIfEmpty commit 15be2602d4d44e29e756cd558130f2303d82fe8b Author: Stephan Ewen Date: 2017-10-25T17:04:58Z [hotfix] [checkpoints] Remove incorrect 'Serializable' from StateBackendFactory commit 1438df297bd820c07787563ddcfd2fda8773387f Author: Stephan Ewen Date: 2017-10-25T12:16:37Z [FLINK-7924] [checkpoints] Fix incorrect names of checkpoint options Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when actually, the checkpoints may always be incremental and only savepoints have to be full and self contained. Initially, we planned to add options for multiple checkpoints, like checkpoints that were foreced to be full, and checkpoints that were incremental. That is not necessary at this point. commit 6b792bf72fdb0c17625b1fe97c52791d550a74e4 Author: Stephan Ewen Date: 2017-10-25T15:30:14Z [FLINK-7925] [checkpoints] Add CheckpointingOptions The CheckpointingOptions consolidate all checkpointing and state backend-related settings that were previously split across different classes. commit c4ce5522d8052ae8af8134bac4ea74bb5a929027 Author: Stephan Ewen Date: 2017-10-25T11:23:46Z [FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator commit 337e354bbe198ce6ab68d23cec93bc0c81bbbfaf Author: Stephan Ewen Date: 2017-10-25T15:32:17Z [hotfix] [core] Fix broken JavaDoc links in ConfigConstants commit 0522ac34146aecfad2dbd9c806e6f7be182d00fd Author: Stephan Ewen Date: 2017-10-25T17:04:10Z [FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration ---
[jira] [Created] (FLINK-7924) Fix incorrect names of checkpoint options
Stephan Ewen created FLINK-7924: --- Summary: Fix incorrect names of checkpoint options Key: FLINK-7924 URL: https://issues.apache.org/jira/browse/FLINK-7924 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.4.0 Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when actually, the checkpoints may always be incremental and only savepoints have to be full and self contained. Initially, we planned to add options for multiple checkpoints, like checkpoints that were foreced to be full, and checkpoints that were incremental. That is not necessary at this point. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7925) Add CheckpointingOptions
Stephan Ewen created FLINK-7925: --- Summary: Add CheckpointingOptions Key: FLINK-7925 URL: https://issues.apache.org/jira/browse/FLINK-7925 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.4.0 The CheckpointingOptions should consolidate all checkpointing and state backend-related settings that were previously split across different classes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4905: [FLINK-7920] Make MiniClusterConfiguration immutable
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4905 Looks good, +1 to merge this ---
[jira] [Commented] (FLINK-7920) Make MiniClusterConfiguration immutable
[ https://issues.apache.org/jira/browse/FLINK-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219268#comment-16219268 ] ASF GitHub Bot commented on FLINK-7920: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4905 Looks good, +1 to merge this > Make MiniClusterConfiguration immutable > --- > > Key: FLINK-7920 > URL: https://issues.apache.org/jira/browse/FLINK-7920 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > The {{MiniClusterConfiguration}} should be made immutable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks
[ https://issues.apache.org/jira/browse/FLINK-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219239#comment-16219239 ] ASF GitHub Bot commented on FLINK-7666: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4900 Perfect! Merging this... > ContinuousFileReaderOperator swallows chained watermarks > > > Key: FLINK-7666 > URL: https://issues.apache.org/jira/browse/FLINK-7666 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.3.2 >Reporter: Ufuk Celebi >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > I use event time and read from a (finite) file. I assign watermarks right > after the {{ContinuousFileReaderOperator}} with parallelism 1. > {code} > env > .readFile(new TextInputFormat(...), ...) > .setParallelism(1) > .assignTimestampsAndWatermarks(...) > .setParallelism(1) > .map()... > {code} > The watermarks I assign never progress through the pipeline. > I can work around this by inserting a {{shuffle()}} after the file reader or > starting a new chain at the assigner: > {code} > env > .readFile(new TextInputFormat(...), ...) > .setParallelism(1) > .shuffle() > .assignTimestampsAndWatermarks(...) > .setParallelism(1) > .map()... > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4900 Perfect! Merging this... ---
[jira] [Updated] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7923: - Component/s: Table API & SQL > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
Rong Rong created FLINK-7923: Summary: SQL parser exception when accessing subfields of a Composite element in an Object Array type column Key: FLINK-7923 URL: https://issues.apache.org/jira/browse/FLINK-7923 Project: Flink Issue Type: Bug Affects Versions: 1.4.0 Reporter: Rong Rong Access type such as: {code:SQL} SELECT a[1].f0 FROM MyTable {code} will cause problem. See following test sample for more details: https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Description: FlinkTypeFactory does not override the following function correctly: {code:java} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} dealing with SQL such as: {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 was: FlinkTypeFactory does not override the following function correctly: {code:java} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} to deal with situations like {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:java} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > dealing with SQL such as: > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Description: FlinkTypeFactory does not override the following function correctly: {code:scala} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} to deal with situations like {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 was: FlinkTypeFactory does not override the following function correctly: `leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... }` to deal with situations like ``` CASE WHEN THEN ELSE NULL END ``` will trigger runtime exception > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:scala} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Description: FlinkTypeFactory does not override the following function correctly: {code:java} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} to deal with situations like {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 was: FlinkTypeFactory does not override the following function correctly: {code:scala} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} to deal with situations like {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:java} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Affects Version/s: 1.4.0 > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:scala} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Component/s: Table API & SQL > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:scala} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-7922: Assignee: Rong Rong > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:scala} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
Rong Rong created FLINK-7922: Summary: leastRestrictive in FlinkTypeFactory does not resolve composite type correctly Key: FLINK-7922 URL: https://issues.apache.org/jira/browse/FLINK-7922 Project: Flink Issue Type: Bug Reporter: Rong Rong FlinkTypeFactory does not override the following function correctly: `leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... }` to deal with situations like ``` CASE WHEN THEN ELSE NULL END ``` will trigger runtime exception -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks
[ https://issues.apache.org/jira/browse/FLINK-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219203#comment-16219203 ] ASF GitHub Bot commented on FLINK-7666: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4900 I see, okay, the status checking in `trigger()` together with the tight locking contract with StreamTask fixes that. My feeling is that this is a workaround to support another non-ideal design, but it should work as a temporary fix, because it is not possible to rework the file monitoring source before the next release. Good with me to merge this... > ContinuousFileReaderOperator swallows chained watermarks > > > Key: FLINK-7666 > URL: https://issues.apache.org/jira/browse/FLINK-7666 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.3.2 >Reporter: Ufuk Celebi >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.4.0 > > > I use event time and read from a (finite) file. I assign watermarks right > after the {{ContinuousFileReaderOperator}} with parallelism 1. > {code} > env > .readFile(new TextInputFormat(...), ...) > .setParallelism(1) > .assignTimestampsAndWatermarks(...) > .setParallelism(1) > .map()... > {code} > The watermarks I assign never progress through the pipeline. > I can work around this by inserting a {{shuffle()}} after the file reader or > starting a new chain at the assigner: > {code} > env > .readFile(new TextInputFormat(...), ...) > .setParallelism(1) > .shuffle() > .assignTimestampsAndWatermarks(...) > .setParallelism(1) > .map()... > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4900 I see, okay, the status checking in `trigger()` together with the tight locking contract with StreamTask fixes that. My feeling is that this is a workaround to support another non-ideal design, but it should work as a temporary fix, because it is not possible to rework the file monitoring source before the next release. Good with me to merge this... ---
[jira] [Commented] (FLINK-7846) Remove guava shading from ES2 connector
[ https://issues.apache.org/jira/browse/FLINK-7846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219200#comment-16219200 ] ASF GitHub Bot commented on FLINK-7846: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4902 Looks good, +1 to merge this > Remove guava shading from ES2 connector > --- > > Key: FLINK-7846 > URL: https://issues.apache.org/jira/browse/FLINK-7846 > Project: Flink > Issue Type: Bug > Components: Build System, ElasticSearch Connector >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.4.0 > > > The ElasticSearch 2 connector pom has a shading configuration for guava. The > only user of guava is the elasticsearch dependency, which is not included in > the jar, making the shading pointless. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7914) Expose Akka gated interval as user option
[ https://issues.apache.org/jira/browse/FLINK-7914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219197#comment-16219197 ] ASF GitHub Bot commented on FLINK-7914: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4903 Change looks good. Is `50 ms` also akka's default value? Out of curiosity, what triggered the need to introduce this option. > Expose Akka gated interval as user option > - > > Key: FLINK-7914 > URL: https://issues.apache.org/jira/browse/FLINK-7914 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > Whenever Akka loses its connection to a remote {{ActorSystem}} it gates the > corresponding address. The default value is {{5 s}}. Especially for tests > this can be too high. Therefore, I propose to expose this option to the user > via the {{AkkaOptions}} and setting it to {{50 ms}} per default. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4902: [FLINK-7846] [elasticsearch] Remove unnecessary guava sha...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4902 Looks good, +1 to merge this ---
[GitHub] flink issue #4903: [FLINK-7914] Introduce AkkaOptions.RETRY_GATE_CLOSED_FOR
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4903 Change looks good. Is `50 ms` also akka's default value? Out of curiosity, what triggered the need to introduce this option. ---
[jira] [Updated] (FLINK-7921) Flink downloads link redirect to spark downloads page
[ https://issues.apache.org/jira/browse/FLINK-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Kumar updated FLINK-7921: -- Description: On the Quickstart [page|https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html] of flink, there`s a download page link under *Download and Unpack tab* which is redirecting to spark downloads page instead of flink. Issue : redirection is happening to port 80(http) instead of port 443(https). Once the download link is changed to https, it works fine. Current download link : http://flink.apache.org/downloads.html Required link : https://flink.apache.org/downloads.html was: On the Quickstart [page|https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html] of flink, there`s a download page link under *Download and Unpack tab* which is redirecting to spark downloads page instead of flink. Problem is redirection is happening to port 80(http) instead of port 443(https). Once the download link is changed to https, it works fine. Current download link : http://flink.apache.org/downloads.html Required link : https://flink.apache.org/downloads.html > Flink downloads link redirect to spark downloads page > - > > Key: FLINK-7921 > URL: https://issues.apache.org/jira/browse/FLINK-7921 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Anil Kumar > > On the Quickstart > [page|https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html] > of flink, there`s a download page link under *Download and Unpack tab* which > is redirecting to spark downloads page instead of flink. > Issue : redirection is happening to port 80(http) instead of port 443(https). > Once the download link is changed to https, it works fine. > Current download link : http://flink.apache.org/downloads.html > Required link : https://flink.apache.org/downloads.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7921) Flink downloads link redirect to spark downloads page
Anil Kumar created FLINK-7921: - Summary: Flink downloads link redirect to spark downloads page Key: FLINK-7921 URL: https://issues.apache.org/jira/browse/FLINK-7921 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.3.2 Reporter: Anil Kumar On the Quickstart [page|https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html] of flink, there`s a download page link under *Download and Unpack tab* which is redirecting to spark downloads page instead of flink. Problem is redirection is happening to port 80(http) instead of port 443(https). Once the download link is changed to https, it works fine. Current download link : http://flink.apache.org/downloads.html Required link : https://flink.apache.org/downloads.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7502) PrometheusReporter improvements
[ https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219154#comment-16219154 ] ASF GitHub Bot commented on FLINK-7502: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4586 I can't think of a better solution either. > PrometheusReporter improvements > --- > > Key: FLINK-7502 > URL: https://issues.apache.org/jira/browse/FLINK-7502 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > Fix For: 1.4.0 > > > * do not throw exceptions on metrics being registered for second time > * allow port ranges for setups where multiple reporters are on same host > (e.g. one TaskManager and one JobManager) > * do not use nanohttpd anymore, there is now a minimal http server included > in [Prometheus JVM client|https://github.com/prometheus/client_java] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4586: [FLINK-7502] [metrics] Improve PrometheusReporter
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4586 I can't think of a better solution either. ---
[jira] [Commented] (FLINK-7824) Put the queryable state jars in the opt folder.
[ https://issues.apache.org/jira/browse/FLINK-7824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219075#comment-16219075 ] Kostas Kloudas commented on FLINK-7824: --- PR https://github.com/apache/flink/pull/4906. > Put the queryable state jars in the opt folder. > --- > > Key: FLINK-7824 > URL: https://issues.apache.org/jira/browse/FLINK-7824 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > > In 1.4, to enable the queryable state, the user has to put the adequate jars > in the lib folder. The first step before *putting* the jars, is to *find* the > jars in the distribution. So the location of these jars can be in the `opt` > folder. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219056#comment-16219056 ] ASF GitHub Bot commented on FLINK-7540: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4812 Thanks for the review @aljoscha. Rebasing and once Travis gives green light, I'll merge this PR. > Akka hostnames are not normalised consistently > -- > > Key: FLINK-7540 > URL: https://issues.apache.org/jira/browse/FLINK-7540 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.3.1, 1.4.0, 1.3.2 >Reporter: Tong Yan Ou >Assignee: Till Rohrmann >Priority: Blocker > Labels: patch > Fix For: 1.4.0, 1.3.3 > > Original Estimate: 336h > Remaining Estimate: 336h > > In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, > Akka seems to preserve the uppercase/lowercase distinctions when starting the > Actor. This leads to problems because other parts (for example > {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional > cluster. > h1. Original Issue Text > Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” > DSJ-signal-900G-71” > When using the following command: > ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 > ~/flink-1.3.1/examples/batch/WordCount.jar --input > /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result > > Or > ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm > "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" > There will be some exceptions at Command line interface: > java.lang.RuntimeException: Unable to get ClusterClient status from > Application Client > at > org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) > … > Caused by: org.apache.flink.util.FlinkException: Could not connect to the > leading JobManager. Please check that the JobManager is running. > h4. Then the job fails , starting the yarn-session is the same. > The exceptions of the application log: > 2017-08-10 17:36:10,334 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)] > … > 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)]] after [1 ms] > And I found some differences in actor System: > 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager > - Starting JobManager at > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. > 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager > - JobManager > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted > leadership with leader session ID Some(----). > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend > listening at 0:0:0:0:0:0:0:0:54921 > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with > JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port > 54921 > 2017-08-10 17:36:00,313 INFO > org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader > reachable under > akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. > The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the > JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” > The hostname of JobManagerRetriever’s actor is lowercase. > And I read source code, > Class NetUtils the unresolvedHostToNormalizedString(String host) method of > line 127: > public static String unresolvedHostToNormalizedString(String host) { > > // Return loopback interface address if host is null > // This represents the behavior of {@code InetAddress.getByName } and RFC > 3330if (host == null) { >host = InetAddress.getLoopbackAddress().getHostAddress(); > } else { host = host.trim().toLowerCase(); > } > ... > } > It turns the host name into lowercase. > Therefore, JobManagerRetriever certainly can not find Jobmanager's > actorSYstem. > Then I removed the call to the toLowerCase() method in the source code. >
[GitHub] flink issue #4812: [FLINK-7540] Apply consistent hostname normalization
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4812 Thanks for the review @aljoscha. Rebasing and once Travis gives green light, I'll merge this PR. ---
[jira] [Commented] (FLINK-7908) Restructure the QS module to reduce client deps.
[ https://issues.apache.org/jira/browse/FLINK-7908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219054#comment-16219054 ] ASF GitHub Bot commented on FLINK-7908: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/4906 [FLINK-7908][FLINK-7824][QS] Restructure QS packages and put QS jars in opt/. ## What is the purpose of the change Make the Queryable State more usable by: 1) Reducing the dependencies the `client` jar (used by the user) transitively brings. 2) Putting the core and client jars in the `opt/` folder. From there, to activate QS the user has to put the core jar in the lib/ folder before starting the cluster, and use the client jar as a dependency to his/her program. ## Brief change log Creating the `core` and `client` modules in the queryable state dir and moving classes around. In addition, now the `client` module becomes a dependency of the `flink-runtime`. Places the jars corresponding the aforementioned modules to the `opt` dir. ## Verifying this change This change is a simple restructuring of the queryable state module, no classes are introduced or deleted. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, adds the `flink-queryable-state-client` dependency to `flink-runtime`. ## Documentation - Does this pull request introduce a new feature? NO, but it needs documentation, which is PENDING. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink qs-restructure Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4906.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4906 commit 057f14d833e56fa731262384a636af13f08623d5 Author: kkloudasDate: 2017-10-24T10:16:08Z [FLINK-7908][QS] Restructure the queryable state module. The QS module is split into core and client. The core should be put in the lib folder to enable queryable state, while the client is the one that the user will program against. The reason for the restructuring in mainly to remove the dependency on the flink-runtime from the user's program. commit dc9d5964bcdd3269fe01e8fe2ab7be74d90dd22c Author: kkloudas Date: 2017-10-24T14:12:27Z [FLINK-7824][QS] Put the QS modules in the opt folder. Now the user can find the jars in the opt/ folder and he can activate QS by putting the core jar in the lib/ folder and program against the client jar. > Restructure the QS module to reduce client deps. > > > Key: FLINK-7908 > URL: https://issues.apache.org/jira/browse/FLINK-7908 > Project: Flink > Issue Type: Sub-task > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4906: [FLINK-7908][FLINK-7824][QS] Restructure QS packag...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/4906 [FLINK-7908][FLINK-7824][QS] Restructure QS packages and put QS jars in opt/. ## What is the purpose of the change Make the Queryable State more usable by: 1) Reducing the dependencies the `client` jar (used by the user) transitively brings. 2) Putting the core and client jars in the `opt/` folder. From there, to activate QS the user has to put the core jar in the lib/ folder before starting the cluster, and use the client jar as a dependency to his/her program. ## Brief change log Creating the `core` and `client` modules in the queryable state dir and moving classes around. In addition, now the `client` module becomes a dependency of the `flink-runtime`. Places the jars corresponding the aforementioned modules to the `opt` dir. ## Verifying this change This change is a simple restructuring of the queryable state module, no classes are introduced or deleted. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes, adds the `flink-queryable-state-client` dependency to `flink-runtime`. ## Documentation - Does this pull request introduce a new feature? NO, but it needs documentation, which is PENDING. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink qs-restructure Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4906.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4906 commit 057f14d833e56fa731262384a636af13f08623d5 Author: kkloudasDate: 2017-10-24T10:16:08Z [FLINK-7908][QS] Restructure the queryable state module. The QS module is split into core and client. The core should be put in the lib folder to enable queryable state, while the client is the one that the user will program against. The reason for the restructuring in mainly to remove the dependency on the flink-runtime from the user's program. commit dc9d5964bcdd3269fe01e8fe2ab7be74d90dd22c Author: kkloudas Date: 2017-10-24T14:12:27Z [FLINK-7824][QS] Put the QS modules in the opt folder. Now the user can find the jars in the opt/ folder and he can activate QS by putting the core jar in the lib/ folder and program against the client jar. ---
[jira] [Commented] (FLINK-7920) Make MiniClusterConfiguration immutable
[ https://issues.apache.org/jira/browse/FLINK-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219041#comment-16219041 ] ASF GitHub Bot commented on FLINK-7920: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4905 [FLINK-7920] Make MiniClusterConfiguration immutable ## What is the purpose of the change Makes the `MiniClusterConfiguration` immutable. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink makeMiniClusterConfigurationImmutable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4905.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4905 commit 4c614d988599d3223d81a04f80a8ceb41a0b9e48 Author: Till RohrmannDate: 2017-10-25T15:31:45Z [FLINK-7920] Make MiniClusterConfiguration immutable > Make MiniClusterConfiguration immutable > --- > > Key: FLINK-7920 > URL: https://issues.apache.org/jira/browse/FLINK-7920 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > The {{MiniClusterConfiguration}} should be made immutable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4905: [FLINK-7920] Make MiniClusterConfiguration immutab...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4905 [FLINK-7920] Make MiniClusterConfiguration immutable ## What is the purpose of the change Makes the `MiniClusterConfiguration` immutable. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink makeMiniClusterConfigurationImmutable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4905.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4905 commit 4c614d988599d3223d81a04f80a8ceb41a0b9e48 Author: Till RohrmannDate: 2017-10-25T15:31:45Z [FLINK-7920] Make MiniClusterConfiguration immutable ---
[jira] [Created] (FLINK-7920) Make MiniClusterConfiguration immutable
Till Rohrmann created FLINK-7920: Summary: Make MiniClusterConfiguration immutable Key: FLINK-7920 URL: https://issues.apache.org/jira/browse/FLINK-7920 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor The {{MiniClusterConfiguration}} should be made immutable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry
[ https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-7919: - Description: A job with a delta iteration fails hard with a NPE in the solution set join, if the solution set has no entry for the join key of the probe side. The following program reproduces the problem: {code} DataSet> values = env.fromElements( Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1)); DeltaIteration , Tuple2 > di = values .iterateDelta(values, 5,0); DataSet > loop = di.getWorkset() .map(new MapFunction , Tuple2 >() { @Override public Tuple2 map(Tuple2 value) throws Exception { // modifying the key to join on a non existing solution set key return Tuple2.of(value.f0 + 1, 1); } }) .join(di.getSolutionSet()).where(0).equalTo(0) .with(new JoinFunction , Tuple2 , Tuple2 >() { @Override public Tuple2 join( Tuple2 first, Tuple2 second) throws Exception { return Tuple2.of(first.f0, first.f1 + second.f1); } }); DataSet > result = di.closeWith(loop, loop); result.print(); {code} It doesn't matter whether the solution set is managed or not. The problem is cause because the solution set hash table prober returns a {{null}} value if the solution set does not contain a value for the probe side key. The join operator does not check if the return value is {{null}} or not but immediately tries to create a copy using a {{TypeSerializer}}. This copy fails with a NPE. I propose to check for {{null}} and call the join function with {{null}} on the solution set side. This gives OUTER JOIN semantics for join. Since the code was previously failing with a NPE, it is safe to forward the {{null}} into the {{JoinFunction}}. However, users must be aware that the solution set value may be {{null}} and we need to update the documentation (JavaDocs + website) to describe the behavior. was: A job with a delta iteration fails hard with a NPE in the solution set join, if the solution set has no entry for the join key of the probe side. The following program reproduces the problem: {code} DataSet > values = env.fromElements( Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1)); DeltaIteration , Tuple2 > di = values .iterateDelta(values, 5,0); DataSet > loop = di.getWorkset() .map(new MapFunction , Tuple2 >() { @Override public Tuple2 map(Tuple2 value) throws Exception { // modifying the key to join on a non existing solution set key return Tuple2.of(value.f0 + 1, 1); } }) .join(di.getSolutionSet()).where(0).equalTo(0) .with(new JoinFunction , Tuple2 , Tuple2 >() { @Override public Tuple2 join( Tuple2 first, Tuple2 second) throws Exception { return Tuple2.of(first.f0, first.f1 + second.f1); } }); DataSet > result = di.closeWith(loop, loop); result.print(); {code} It doesn't matter whether the solution set is managed or not. The problem is cause because the solution set hash table prober returns a {{null}} value if the solution set does not contain a value for the probe side key. The join operator does not check if the return value is {{null}} or not but immediately tries to create a copy using a {{TypeSerializer}}. This copy fails with a NPE. There are two solutions: 1. Check for {{null}} and do not call the join function (INNER join semantics) 2. Check for {{null}} and call the join function with {{null}} on the solution set side (OUTER join semantics) Either way, the chosen behavior should be documented. > Join with Solution Set fails with NPE if Solution Set has no entry > -- > > Key: FLINK-7919 > URL: https://issues.apache.org/jira/browse/FLINK-7919 > Project: Flink > Issue Type: Bug > Components: DataSet API, Local Runtime >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske > > A job with a delta iteration fails hard with a NPE in the solution set join, > if the solution set has no entry for the join key of the probe side. > The following program reproduces the problem: > {code} > DataSet > values = env.fromElements( > Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1)); > DeltaIteration , Tuple2 > di = values >
[jira] [Commented] (FLINK-7502) PrometheusReporter improvements
[ https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218989#comment-16218989 ] ASF GitHub Bot commented on FLINK-7502: --- Github user mbode commented on the issue: https://github.com/apache/flink/pull/4586 I mean in general it is probably not the best thing to just rely on the port not being available as a consensus algorithm of who should claim which port. Then again, I could not think of a straightforward way to coordinate across Job/TaskManagers without using something external, which would probably get too involved for a metrics-related component – maybe there is something I am missing? > PrometheusReporter improvements > --- > > Key: FLINK-7502 > URL: https://issues.apache.org/jira/browse/FLINK-7502 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > Fix For: 1.4.0 > > > * do not throw exceptions on metrics being registered for second time > * allow port ranges for setups where multiple reporters are on same host > (e.g. one TaskManager and one JobManager) > * do not use nanohttpd anymore, there is now a minimal http server included > in [Prometheus JVM client|https://github.com/prometheus/client_java] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4586: [FLINK-7502] [metrics] Improve PrometheusReporter
Github user mbode commented on the issue: https://github.com/apache/flink/pull/4586 I mean in general it is probably not the best thing to just rely on the port not being available as a consensus algorithm of who should claim which port. Then again, I could not think of a straightforward way to coordinate across Job/TaskManagers without using something external, which would probably get too involved for a metrics-related component â maybe there is something I am missing? ---
[jira] [Created] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry
Fabian Hueske created FLINK-7919: Summary: Join with Solution Set fails with NPE if Solution Set has no entry Key: FLINK-7919 URL: https://issues.apache.org/jira/browse/FLINK-7919 Project: Flink Issue Type: Bug Components: DataSet API, Local Runtime Affects Versions: 1.3.2, 1.4.0 Reporter: Fabian Hueske A job with a delta iteration fails hard with a NPE in the solution set join, if the solution set has no entry for the join key of the probe side. The following program reproduces the problem: {code} DataSet> values = env.fromElements( Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1)); DeltaIteration , Tuple2 > di = values .iterateDelta(values, 5,0); DataSet > loop = di.getWorkset() .map(new MapFunction , Tuple2 >() { @Override public Tuple2 map(Tuple2 value) throws Exception { // modifying the key to join on a non existing solution set key return Tuple2.of(value.f0 + 1, 1); } }) .join(di.getSolutionSet()).where(0).equalTo(0) .with(new JoinFunction , Tuple2 , Tuple2 >() { @Override public Tuple2 join( Tuple2 first, Tuple2 second) throws Exception { return Tuple2.of(first.f0, first.f1 + second.f1); } }); DataSet > result = di.closeWith(loop, loop); result.print(); {code} It doesn't matter whether the solution set is managed or not. The problem is cause because the solution set hash table prober returns a {{null}} value if the solution set does not contain a value for the probe side key. The join operator does not check if the return value is {{null}} or not but immediately tries to create a copy using a {{TypeSerializer}}. This copy fails with a NPE. There are two solutions: 1. Check for {{null}} and do not call the join function (INNER join semantics) 2. Check for {{null}} and call the join function with {{null}} on the solution set side (OUTER join semantics) Either way, the chosen behavior should be documented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4904: [hotfix] reorder the methods so they conform to th...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4904 [hotfix] reorder the methods so they conform to their order in the interface ## What is the purpose of the change As discussed with @aljoscha in https://github.com/apache/flink/pull/4881, I'm moving those unmerged changes to this PR. The purpose is to reorder the impl methods so they conform to their order in the interface ## Brief change log - reorder the impl methods ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4904.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4904 commit 7c16e92ca67f7d6fb59136ba908959d3deaef058 Author: Bowen LiDate: 2017-10-25T15:01:22Z [hotfix] reorder the methods so they conform to the order in the interface ---
[GitHub] flink issue #4881: [FLINK-7864] [DataStream API] Support side-outputs in CoP...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4881 Sounds good. Thanks! ---
[jira] [Commented] (FLINK-7864) Support side-outputs in CoProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-7864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218907#comment-16218907 ] ASF GitHub Bot commented on FLINK-7864: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4881 Sounds good. Thanks! > Support side-outputs in CoProcessFunction > - > > Key: FLINK-7864 > URL: https://issues.apache.org/jira/browse/FLINK-7864 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Bowen Li > Fix For: 1.4.0 > > > We forgot to add support for side-outputs when we added that to > {{ProcessFunction}}. Should be as easy as adding it to the {{Context}} and > wiring it in. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218850#comment-16218850 ] Piotr Nowojski edited comment on FLINK-7838 at 10/25/17 3:20 PM: - Added log with another dead lock example at org.apache.kafka.clients.producer.KafkaProducer.initTransactions, called in restoring state. was (Author: pnowojski): Add log with another dead lock example at org.apache.kafka.clients.producer.KafkaProducer.initTransactions, called in restoring state. > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218876#comment-16218876 ] Till Rohrmann commented on FLINK-7880: -- Apparently, there are also other QS tests affected by this. Can we either fix this or make sure that only tests are affected but not the real usage of the QS client? https://travis-ci.org/tillrohrmann/flink/jobs/292608461 > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7916) Remove NetworkStackThroughputITCase
[ https://issues.apache.org/jira/browse/FLINK-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218859#comment-16218859 ] Ufuk Celebi commented on FLINK-7916: We also have {{flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java}}. Agree that this is an anti-pattern and vote to either remove or establish a "benchmark" module. > Remove NetworkStackThroughputITCase > --- > > Key: FLINK-7916 > URL: https://issues.apache.org/jira/browse/FLINK-7916 > Project: Flink > Issue Type: Task > Components: Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann > Fix For: 1.5.0 > > > Flink's code base contains the {{NetworkStackThroughputITCase}} which is not > really a test. Moreover it is marked as {{Ignored}}. I propose to remove this > test because it is more of a benchmark. We could think about creating a > benchmark project where we move these kind of "tests". > In general I think we should remove ignored tests if they won't be fixed > immediately. The danger is far too high that we forget about them and then we > only keep the maintenance burden of it. This is especially true for the above > mentioned test case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7542) Some tests in AggregateITCase fail for some Time Zones
[ https://issues.apache.org/jira/browse/FLINK-7542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-7542: Assignee: Fabian Hueske > Some tests in AggregateITCase fail for some Time Zones > -- > > Key: FLINK-7542 > URL: https://issues.apache.org/jira/browse/FLINK-7542 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Usman Younas >Assignee: Fabian Hueske > > In {{org.apache.flink.table.runtime.batch.sql.AggregateITCase}} two tests > 1. testTumbleWindowAggregate and > 2. testHopWindowAggregate > are failing for some time zones. > Bug can be produced by changing the time zone of machine to > Time Zone: Central Daylight Time > Closest City: Houston-United States > I think, problem is with Timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7380) Limit usage of Row type
[ https://issues.apache.org/jira/browse/FLINK-7380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218854#comment-16218854 ] Joshua Griffith commented on FLINK-7380: I use GenericTypeInfo to build trees using delta iterations because RowTypeInfo doesn't support recursive definitions. I have to use Rows because I only have type information at runtime when I'm building the job. Perhaps this could be a warning rather than an exception? Alternatively, it would be useful if there was an easier way to dynamically build recursive type information. > Limit usage of Row type > --- > > Key: FLINK-7380 > URL: https://issues.apache.org/jira/browse/FLINK-7380 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: Timo Walther >Assignee: Timo Walther > > The recently introduced {{Row}} type causes a lot of confusion for users. By > default they are serialized using Kryo. We should not allow to use > {{GenericTypeInfo}}. The TypeExtractor should throw an exception and > encourage to provide proper field types. The Row class should be final and > only targeted for intended use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218850#comment-16218850 ] Piotr Nowojski edited comment on FLINK-7838 at 10/25/17 3:07 PM: - Add log with another dead lock example at org.apache.kafka.clients.producer.KafkaProducer.initTransactions, called in restoring state. was (Author: pnowojski): Deadlock on another phase. > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7918) Run AbstractTestBase tests on Flip-6 MiniCluster
Till Rohrmann created FLINK-7918: Summary: Run AbstractTestBase tests on Flip-6 MiniCluster Key: FLINK-7918 URL: https://issues.apache.org/jira/browse/FLINK-7918 Project: Flink Issue Type: New Feature Components: Tests Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann In order to extend our test coverage for Flip-6, we should enable all {{AbstractTestBase}} tests to be executed via Flip-6's {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-7838: -- Attachment: initTransactions_deadlock.txt Deadlock on another phase. > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218776#comment-16218776 ] Konstantin Lalafaryan commented on FLINK-7913: -- Thanks for your comment. Yes, you are right. But I have just found out that you can use the Kafka's default partitioner by doing following: {code:java} outputStream.addSink(new FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC), new EventSerializationSchema(),producerProperties, null)); {code} Basically you have to pass null value for customPartitioner. > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7669) Always load Flink classes through parent ClassLoader
[ https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218670#comment-16218670 ] ASF GitHub Bot commented on FLINK-7669: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4891 Merged > Always load Flink classes through parent ClassLoader > > > Key: FLINK-7669 > URL: https://issues.apache.org/jira/browse/FLINK-7669 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.4.0 > Environment: - OS: macOS Sierra > - Oracle JDK 1.8 > - Scala 2.11.11 > - sbt 0.13.16 > - Build from trunk code at commit hash > {{42cc3a2a9c41dda7cf338db36b45131db9150674}} > -- started a local flink node >Reporter: Raymond Tay >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > Latest code pulled from trunk threw errors at runtime when i ran a job > against it; but when i ran the JAR against the stable version {{1.3.2}} it > was OK. Here is the stacktrace. > An exception is being thrown : > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of > programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22# > Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader > session id ----. > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 > (Flink Streaming Job) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629) > at > org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53) > at > org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to > submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1358) > at >