[GitHub] [flink] StefanRRichter commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
StefanRRichter commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298170108 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + * */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox =
[GitHub] [flink] sjwiesman commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
sjwiesman commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298169905 ## File path: pom.xml ## @@ -82,11 +81,13 @@ under the License. flink-mesos flink-metrics flink-yarn + flink-shaded-yarn-tests flink-yarn-tests flink-fs-tests flink-docs flink-python flink-ml-parent + flink-walkthroughs Review comment: Rebased on fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13019) IT test for fine-grained recovery
Andrey Zagrebin created FLINK-13019: --- Summary: IT test for fine-grained recovery Key: FLINK-13019 URL: https://issues.apache.org/jira/browse/FLINK-13019 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Andrey Zagrebin Assignee: Andrey Zagrebin Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13018) Serving docs locally with jekyll fails with inotify limit
Nico Kruber created FLINK-13018: --- Summary: Serving docs locally with jekyll fails with inotify limit Key: FLINK-13018 URL: https://issues.apache.org/jira/browse/FLINK-13018 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.9.0 Reporter: Nico Kruber Assignee: Nico Kruber Both {{build-docs.sh -i}} and {{build-docs.sh -p}} currently fail (also in the dockerized builds in {{docs/docker}}): {code} $ ./build_docs.sh -p Fetching gem metadata from https://rubygems.org/.. ... Bundle complete! 8 Gemfile dependencies, 36 gems now installed. Bundled gems are installed into `./.rubydeps` Configuration file: /home/nico/Projects/flink/docs/_config.yml Source: /home/nico/Projects/flink/docs Destination: /home/nico/Projects/flink/docs/content Incremental build: disabled. Enable with --incremental Generating... done in 167.943 seconds. jekyll 3.7.2 | Error: Too many open files - Failed to initialize inotify: the user limit on the total number of inotify instances has been reached. {code} Probably, {{inotify}} is used in a way to monitor single files and not just directories but I don't know that and couldn't find a way to change how jekyll is using inotify. I wouldn't suggest working around by setting a higher inotify limit but upgrading jekyll did not solve it and so far there are two options: # disable watching files via {{--no-watch}} # use polling instead of `inotify` via `--force_polling` # try to reduce the set of files by adding excludes for (expected) static files -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser
wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser URL: https://github.com/apache/flink/pull/8850#discussion_r298160404 ## File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateView.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.error.SqlParseException; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlCreate; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.List; + +/** + * CREATE VIEW DDL sql call. + */ +public class SqlCreateView extends SqlCreate implements ExtendedSqlNode { + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_VIEW", SqlKind.CREATE_VIEW); + + private final SqlIdentifier viewName; + private final SqlNodeList fieldList; + private final SqlNode query; + private final SqlCharStringLiteral comment; + + public SqlCreateView( + SqlParserPos pos, + SqlIdentifier viewName, + SqlNodeList fieldList, + SqlNode query, + boolean replace, + SqlCharStringLiteral comment) { + super(OPERATOR, pos, replace, false); + this.viewName = viewName; + this.fieldList = fieldList; + this.query = query; + this.comment = comment; + } + + @Override + public List getOperandList() { + List ops = Lists.newArrayList(); + ops.add(viewName); + ops.add(fieldList); Review comment: Do we want to support field list in the first version? If not, I would suggest to remove it to keep compatible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser
wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser URL: https://github.com/apache/flink/pull/8850#discussion_r298130713 ## File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java ## @@ -583,6 +583,25 @@ public void testInvalidUpsertOverwrite() { "OVERWRITE expression is only used with INSERT mode"); } + @Test + public void testCreateViewWithProperty() { Review comment: Please rename the method name. Please also add some tests about create view without comment and drop view. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
StefanRRichter commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298162033 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -226,13 +219,18 @@ protected StreamTask( this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); this.recordWriters = createRecordWriters(configuration, environment); this.syncSavepointLatch = new SynchronousSavepointLatch(); - this.mailbox = new MailboxImpl(); + this.mailboxProcessor = new MailboxProcessor(this); Review comment: I did this, so as to avoid any changes to all the subclasses - but I guess then it's ok and I will simply have a couple more touched files. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser
wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser URL: https://github.com/apache/flink/pull/8850#discussion_r298161597 ## File path: flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl ## @@ -334,6 +317,50 @@ void PartitionSpecCommaList(SqlNodeList list) : } +/** +* Parses a create view or replace existing view statement. +* CREATE [OR REPLACE] VIEW view_name [ (field1, field2 ...) ] AS select_statement +*/ +SqlCreate SqlCreateView(Span s, boolean replace) : { +boolean replaceView = false; +SqlIdentifier viewName; +SqlCharStringLiteral comment = null; +SqlNode query; +SqlNodeList fieldList = SqlNodeList.EMPTY; +} +{ +[ { replaceView = true; } ] Review comment: We can remove this, the parameter `replace` already do this. `SqlCreateTable` is the same. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506333942 cc @bowenli86 @xuefuz @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506333597 Here's [build result](https://travis-ci.org/lirui-apache/flink/builds/551257514) of my own repo. The connector_hive_1 failed as expected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker
flinkbot commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker URL: https://github.com/apache/flink/pull/8917#issuecomment-506330683 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] NicoK opened a new pull request #8917: [FLINK-13017][docs] do not mount local $HOME into docker
NicoK opened a new pull request #8917: [FLINK-13017][docs] do not mount local $HOME into docker URL: https://github.com/apache/flink/pull/8917 ## What is the purpose of the change Remove the (writable!) mount of a user's $HOME into the dockerized documentation build container in order to - make the builds independent from the host system (making them reproducible) - not have the commands in the container affect the host ## Brief change log - remove mounting user $HOME ## Verifying this change I verified building the docs inside the new environment. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13017) Broken and irreproducible dockerized docs build
[ https://issues.apache.org/jira/browse/FLINK-13017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13017: --- Labels: pull-request-available (was: ) > Broken and irreproducible dockerized docs build > --- > > Key: FLINK-13017 > URL: https://issues.apache.org/jira/browse/FLINK-13017 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > Labels: pull-request-available > > The build tools around {{docs/docker}} seem broken and (on my machine) give > errors like the following while it is working on a colleague's machine: > {code} > bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory > bash: __git_ps1: command not found > {code} > {code} > /usr/bin/env: 'ruby.ruby2.5': No such file or directory > bash: __git_ps1: command not found > {code} > Reason seems to be that your whole user's $HOME is mounted (writable!) into > the docker container. We should just mount the docs directory to get > # builds which are independent from the host system (making them reproducible) > # not have the commands in the container affect the host(!) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298150038 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxExecutorServiceImpl.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import javax.annotation.Nonnull; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of an executor service build around a mailbox-based execution model. + */ +public class TaskMailboxExecutorServiceImpl extends AbstractExecutorService implements TaskMailboxExecutorService { + + /** Reference to the thread that executes the mailbox letters. */ + @Nonnull + private final Thread taskMailboxThread; + + /** The mailbox that manages the submitted runnable objects. */ + @Nonnull + private final Mailbox mailbox; + + public TaskMailboxExecutorServiceImpl(@Nonnull Mailbox mailbox) { + this(mailbox, Thread.currentThread()); + } + + public TaskMailboxExecutorServiceImpl(@Nonnull Mailbox mailbox, @Nonnull Thread taskMailboxThread) { + this.mailbox = mailbox; + this.taskMailboxThread = taskMailboxThread; + } + + @Override + public void execute(@Nonnull Runnable command) { + checkIsNotMailboxThread(); + try { + mailbox.putMail(command); + } catch (InterruptedException irex) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Sender thread was interrupted while blocking on mailbox.", irex); + } catch (MailboxStateException mbex) { + throw new RejectedExecutionException(mbex); + } + } + + @Override + public boolean tryExecute(Runnable command) { + try { + return mailbox.tryPutMail(command); + } catch (MailboxStateException e) { + throw new RejectedExecutionException(e); + } + } + + @Override + public void yield() throws InterruptedException, IllegalStateException { + checkIsMailboxThread(); + try { + Runnable runnable = mailbox.takeMail(); + runnable.run(); + } catch (MailboxStateException e) { + throw new IllegalStateException("Mailbox can no longer supply runnables for yielding.", e); + } + } + + @Override + public boolean tryYield() throws IllegalStateException { + checkIsMailboxThread(); + try { + Optional runnableOptional = mailbox.tryTakeMail(); + if (runnableOptional.isPresent()) { + runnableOptional.get().run(); + return true; + } else { + return false; + } + } catch (MailboxStateException e) { + throw new IllegalStateException("Mailbox can no longer supply runnables for yielding.", e); + } + } + + @Override + public boolean isMailboxThread() { + return Thread.currentThread() == taskMailboxThread; + } + + @Override + public void shutdown() { + mailbox.quiesce(); + } + + @Nonnull + @Override + public List shutdownNow() { + return mailbox.close(); + } + + @Override + public boolean isShutdown() { + return mailbox.getState() != Mailbox.State.OPEN; + } + + @Override + public boolean isTerminated() { + return mailbox.getState() == Mailbox.State.CLOSED; +
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298146026 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298135855 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298130215 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + * */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new
[jira] [Updated] (FLINK-13017) Broken and irreproducible dockerized docs build
[ https://issues.apache.org/jira/browse/FLINK-13017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-13017: Description: The build tools around {{docs/docker}} seem broken and (on my machine) give errors like the following while it is working on a colleague's machine: {code} bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory bash: __git_ps1: command not found {code} {code} /usr/bin/env: 'ruby.ruby2.5': No such file or directory bash: __git_ps1: command not found {code} Reason seems to be that your whole user's $HOME is mounted (writable!) into the docker container. We should just mount the docs directory to get # builds which are independent from the host system (making them reproducible) # not have the commands in the container affect the host(!) was: The build tools around {{docs/docker}} seem broken and (on my machine) give errors like the following while it is working on a colleague's machine: {code} bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory bash: __git_ps1: command not found {code} {code} ```/usr/bin/env: 'ruby.ruby2.5': No such file or directory bash: __git_ps1: command not found``` {code} Reason seems to be that your whole user's $HOME is mounted (writable!) into the docker container. We should just mount the docs directory to get # builds which are independent from the host system (making them reproducible) # not have the commands in the container affect the host(!) > Broken and irreproducible dockerized docs build > --- > > Key: FLINK-13017 > URL: https://issues.apache.org/jira/browse/FLINK-13017 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > > The build tools around {{docs/docker}} seem broken and (on my machine) give > errors like the following while it is working on a colleague's machine: > {code} > bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory > bash: __git_ps1: command not found > {code} > {code} > /usr/bin/env: 'ruby.ruby2.5': No such file or directory > bash: __git_ps1: command not found > {code} > Reason seems to be that your whole user's $HOME is mounted (writable!) into > the docker container. We should just mount the docs directory to get > # builds which are independent from the host system (making them reproducible) > # not have the commands in the container affect the host(!) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298133778 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298148314 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298153219 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298125132 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ## @@ -226,13 +219,18 @@ protected StreamTask( this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); this.recordWriters = createRecordWriters(configuration, environment); this.syncSavepointLatch = new SynchronousSavepointLatch(); - this.mailbox = new MailboxImpl(); + this.mailboxProcessor = new MailboxProcessor(this); Review comment: Currently the `MailboxDefaultAction` interface has only one abstract method. You can write `new MailboxProcessor(this::performDefaultAction)` instead and remove `MailboxDefaultAction` from `StreamTask implements`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298128321 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + * */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new
[GitHub] [flink] WeiZhong94 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
WeiZhong94 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910#issuecomment-506327614 LGTM, +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
flinkbot commented on issue #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916#issuecomment-506327371 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12897) Improve the Python Table API docs by adding more examples
[ https://issues.apache.org/jira/browse/FLINK-12897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12897: --- Labels: pull-request-available (was: ) > Improve the Python Table API docs by adding more examples > - > > Key: FLINK-12897 > URL: https://issues.apache.org/jira/browse/FLINK-12897 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Documentation >Reporter: Dian Fu >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > > As discussed in [https://github.com/apache/flink/pull/8774], we need to > improve the Python Table API docs by adding more examples. Currently, a few > APIs have no examples. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8915: [FLINK-13015][table-api-java] Create validators, strategies and transformations required for porting logical expressions
flinkbot commented on issue #8915: [FLINK-13015][table-api-java] Create validators, strategies and transformations required for porting logical expressions URL: https://github.com/apache/flink/pull/8915#issuecomment-506326938 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 opened a new pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples
WeiZhong94 opened a new pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples URL: https://github.com/apache/flink/pull/8916 ## What is the purpose of the change *This pull request add more Python Table API and relevant DataStream/DataSet configuration examples on flink docs and python sphinx docs.* ## Brief change log - *Add examples for the obscure API in python sphinx docs.* - *Add examples for supported python API in flink docs.* - *Fix some typo and style in docs.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-9728) Wrong ruby version for building flink docs in docker container
[ https://issues.apache.org/jira/browse/FLINK-9728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber resolved FLINK-9728. Resolution: Duplicate > Wrong ruby version for building flink docs in docker container > -- > > Key: FLINK-9728 > URL: https://issues.apache.org/jira/browse/FLINK-9728 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Florian Schmidt >Priority: Minor > > When trying to use the dockerized jekyll build I get the following error > > {code:java} > $ ./build_docs.sh -p > Your Ruby version is 2.0.0, but your Gemfile specified >= 2.1.0{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zentol merged pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink
zentol merged pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink URL: https://github.com/apache/flink/pull/8902 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13015) Create validators, strategies and transformations required for porting logical expressions
[ https://issues.apache.org/jira/browse/FLINK-13015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13015: --- Labels: pull-request-available (was: ) > Create validators, strategies and transformations required for porting > logical expressions > -- > > Key: FLINK-13015 > URL: https://issues.apache.org/jira/browse/FLINK-13015 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > The goal of this task is to implement : > InputTypeValidator: > * by type root > TypeStrategies: > * cascade > * explicit > TypeTransformations: > * to_nullable > This set of classes will enable porting > AND/OR/NOT/IS_TRUE/IS_FALSE/IS_NOT_TRUE/IS_NOT_FALSE to new type inference > stack. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dawidwys opened a new pull request #8915: [FLINK-13015][table-api-java] Create validators, strategies and transformations required for porting logical expressions
dawidwys opened a new pull request #8915: [FLINK-13015][table-api-java] Create validators, strategies and transformations required for porting logical expressions URL: https://github.com/apache/flink/pull/8915 ## What is the purpose of the change Create validators, strategies and transformations required for porting logical expressions ## Brief change log See commit messages ## Verifying this change This change is already covered by existing tests. Additionally added tests for the newly introduced classess. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13017) Broken and irreproducible dockerized docs build
Nico Kruber created FLINK-13017: --- Summary: Broken and irreproducible dockerized docs build Key: FLINK-13017 URL: https://issues.apache.org/jira/browse/FLINK-13017 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.8.0, 1.7.2, 1.6.4, 1.9.0 Reporter: Nico Kruber Assignee: Nico Kruber The build tools around {{docs/docker}} seem broken and (on my machine) give errors like the following while it is working on a colleague's machine: {code} bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory bash: __git_ps1: command not found {code} {code} ```/usr/bin/env: 'ruby.ruby2.5': No such file or directory bash: __git_ps1: command not found``` {code} Reason seems to be that your whole user's $HOME is mounted (writable!) into the docker container. We should just mount the docs directory to get # builds which are independent from the host system (making them reproducible) # not have the commands in the container affect the host(!) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-12980) Getting Started - Add Top-Level Section to Existing Documentation
[ https://issues.apache.org/jira/browse/FLINK-12980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber resolved FLINK-12980. - Resolution: Fixed Fix Version/s: 1.9.0 Merged in master via e377bf669cb8a12fe9eec241236421698198ea1e (unfortunately with the wrong jira ticket tag) > Getting Started - Add Top-Level Section to Existing Documentation > - > > Key: FLINK-12980 > URL: https://issues.apache.org/jira/browse/FLINK-12980 > Project: Flink > Issue Type: Sub-task >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Major > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] NicoK merged pull request #8873: [FLINK-12639] [docs] add "getting started" section to documentation
NicoK merged pull request #8873: [FLINK-12639] [docs] add "getting started" section to documentation URL: https://github.com/apache/flink/pull/8873 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on a change in pull request #8861: [FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new savepoints
sjwiesman commented on a change in pull request #8861: [FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new savepoints URL: https://github.com/apache/flink/pull/8861#discussion_r298141523 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.state.api.output.OperatorStateReducer; +import org.apache.flink.state.api.output.OperatorSubtaskStateReducer; +import org.apache.flink.state.api.output.SavepointOutputFormat; +import org.apache.flink.state.api.runtime.metadata.SavepointMetadata; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Any savepoint that can be written to from a batch context. + * @param The implementation type. + */ +@PublicEvolving +@SuppressWarnings("WeakerAccess") +public abstract class WritableSavepoint { + + protected final Map transformations; + + protected final List droppedOperators; + + WritableSavepoint() { + this.transformations = new HashMap<>(); + this.droppedOperators = new ArrayList<>(); + } + + /** +* Drop an existing operator from the savepoint. +* @param uid The uid of the operator. +* @return A modified savepoint. +*/ + @SuppressWarnings("unchecked") + public F removeOperator(String uid) { + droppedOperators.add(uid); + transformations.remove(uid); + return (F) this; + } + + /** +* Adds a new operator to the savepoint. +* @param uid The uid of the operator. +* @param transformation The operator to be included. +* @return The modified savepoint. +*/ + @SuppressWarnings("unchecked") + public F withOperator(String uid, BootstrapTransformation transformation) { + if (transformations.containsKey(uid)) { + throw new IllegalArgumentException("The savepoint already contains uid " + uid + ". All uid's must be unique"); + } + + transformations.put(uid, transformation); + return (F) this; + } + + /** +* Write out a new or updated savepoint. +* @param path The path to where the savepoint should be written. +*/ + public abstract void write(String path); + + protected static void write( + Path savepointPath, + Map transformations, + StateBackend stateBackend, + SavepointMetadata metadata, + @Nullable DataSet existingOperators) { + + DataSet newOperatorStates = transformations + .entrySet() + .stream() + .map(entry -> getOperatorStates(savepointPath, entry.getKey(), stateBackend, entry.getValue(), metadata)) + .reduce(DataSet::union) + .orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator")); + + DataSet finalOperatorStates; + if (existingOperators == null) { + finalOperatorStates = newOperatorStates; + } else { + finalOperatorStates = newOperatorStates.union(existingOperators); Review comment: Currently no, it is just a shallow copy of pointers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git
[GitHub] [flink] NicoK commented on issue #8873: [FLINK-12639] [docs] add "getting started" section to documentation
NicoK commented on issue #8873: [FLINK-12639] [docs] add "getting started" section to documentation URL: https://github.com/apache/flink/pull/8873#issuecomment-506317642 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman commented on a change in pull request #8861: [FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new savepoints
sjwiesman commented on a change in pull request #8861: [FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new savepoints URL: https://github.com/apache/flink/pull/8861#discussion_r298141103 ## File path: flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.state.api.output.OperatorStateReducer; +import org.apache.flink.state.api.output.OperatorSubtaskStateReducer; +import org.apache.flink.state.api.output.SavepointOutputFormat; +import org.apache.flink.state.api.runtime.metadata.SavepointMetadata; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Any savepoint that can be written to from a batch context. + * @param The implementation type. + */ +@PublicEvolving +@SuppressWarnings("WeakerAccess") +public abstract class WritableSavepoint { + + protected final Map transformations; + + protected final List droppedOperators; + + WritableSavepoint() { + this.transformations = new HashMap<>(); + this.droppedOperators = new ArrayList<>(); + } + + /** +* Drop an existing operator from the savepoint. +* @param uid The uid of the operator. +* @return A modified savepoint. +*/ + @SuppressWarnings("unchecked") + public F removeOperator(String uid) { + droppedOperators.add(uid); Review comment: Yes, that is much better than what I have. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
azagrebin edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#issuecomment-506313023 @StephanEwen Thanks for bringing this up. As @zhijiangW already described, true, the idea was to drop `IOManager` dependency in Shuffle Service API because `NettyShuffleEnviroment` basically uses only narrowed scope of IOManager, namely switching between temporary files. The valid concern is that if we decide to use `IOManagerAsync` with its threads then the threads should (or at least could) be shared by all IO components which use them. In that case Shuffle Service API has to accept it as a dependency created outside. As discussed offline, if we decide later that we need `IOManagerAsync` for shuffling then we can revert the change and add the abstract/interface `IOManager` back to `ShuffleEnvironmentContext`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
azagrebin edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#issuecomment-506313023 @StephanEwen Thanks for bringing this up. As @zhijiangW already described, true, the idea was to drop `IOManager` dependency in Shuffle Service API because `NettyShuffleEnviroment` basically uses only narrowed scope of IOManager, namely switching between temporary files. The valid concern is that if we decide to use `IOManagerAsync` with its threads then the threads should be shared by all IO components which use them. In that case Shuffle Service API has to accept it as a dependency created outside. As discussed offline, if we decide later that we need `IOManagerAsync` for shuffling then we can revert the change and add the abstract/interface `IOManager` back to `ShuffleEnvironmentContext`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
azagrebin commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#issuecomment-506313023 @StephanEwen Thanks for bringing this up. True, the idea was to drop `IOManager` dependency in Shuffle Service API because `NettyShuffleEnviroment` basically uses only narrowed scope of IOManager, namely switching between temporary files. The narrowed concern of switching between temporary files is moved into `FileChannelManager` which is also used by the `IOManager`. `NettyShuffleEnviroment` can create its own light-weight `FileChannelManager` with its own directory prefix. The valid concern is that if we decide to use `IOManagerAsync` with its threads then the threads should be shared by all IO components which use them. In that case Shuffle Service API has to accept it as a dependency created outside. As discussed offline, if we decide later that we need `IOManagerAsync` for shuffling then we can revert the change and add the abstract/interface `IOManager` back to `ShuffleEnvironmentContext`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11662) Discarded checkpoint can cause Tasks to fail
[ https://issues.apache.org/jira/browse/FLINK-11662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11662: --- Labels: pull-request-available (was: ) > Discarded checkpoint can cause Tasks to fail > > > Key: FLINK-11662 > URL: https://issues.apache.org/jira/browse/FLINK-11662 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.7.0, 1.8.0 >Reporter: madong >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Attachments: jobmanager.log, taskmanager.log > > > Flink's {{CheckpointCoordinator}} discards an ongoing checkpoint as soon as > it receives the first decline message. Part of the discard operation is the > deletion of the checkpointing directory. Depending on the underlying > {{FileSystem}} implementation, concurrent write and read operation to files > in the checkpoint directory can then fail (e.g. this is the case with HDFS). > If there is still a local checkpointing operation running for some {{Task}} > and belonging to the discarded checkpoint, then it can happen that the > checkpointing operation fails (e.g. an {{AsyncCheckpointRunnable}}). > Depending on the configuration of the {{CheckpointExceptionHandler}}, this > can lead to a task failure and a job recovery which is caused by an already > discarded checkpoint. > {code:java} > 2019-02-16 11:26:29.378 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc. > 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job > 599a6ac3c371874d12ebf024978cadbc. > 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc. > org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: > Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was > not running > at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource > -> mapOperate -> Timestamps/Watermarks (1/3) > (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED. > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1389046 for operator > Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for > operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > ... 6 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > at >
[jira] [Closed] (FLINK-11662) Discarded checkpoint can cause Tasks to fail
[ https://issues.apache.org/jira/browse/FLINK-11662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-11662. -- Resolution: Fixed Fix Version/s: 1.9.0 Merged in: master: b760d55 > Discarded checkpoint can cause Tasks to fail > > > Key: FLINK-11662 > URL: https://issues.apache.org/jira/browse/FLINK-11662 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.7.0, 1.8.0 >Reporter: madong >Assignee: Yun Tang >Priority: Critical > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: jobmanager.log, taskmanager.log > > Time Spent: 10m > Remaining Estimate: 0h > > Flink's {{CheckpointCoordinator}} discards an ongoing checkpoint as soon as > it receives the first decline message. Part of the discard operation is the > deletion of the checkpointing directory. Depending on the underlying > {{FileSystem}} implementation, concurrent write and read operation to files > in the checkpoint directory can then fail (e.g. this is the case with HDFS). > If there is still a local checkpointing operation running for some {{Task}} > and belonging to the discarded checkpoint, then it can happen that the > checkpointing operation fails (e.g. an {{AsyncCheckpointRunnable}}). > Depending on the configuration of the {{CheckpointExceptionHandler}}, this > can lead to a task failure and a job recovery which is caused by an already > discarded checkpoint. > {code:java} > 2019-02-16 11:26:29.378 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc. > 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job > 599a6ac3c371874d12ebf024978cadbc. > 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc. > org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: > Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was > not running > at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource > -> mapOperate -> Timestamps/Watermarks (1/3) > (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED. > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1389046 for operator > Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for > operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > ... 6 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at
[GitHub] [flink] StefanRRichter merged pull request #8745: [FLINK-11662] Disable task to fail on checkpoint errors
StefanRRichter merged pull request #8745: [FLINK-11662] Disable task to fail on checkpoint errors URL: https://github.com/apache/flink/pull/8745 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#issuecomment-506310994 @StephanEwen thanks for sharing the background and bringing this concern! The abstract `IOManager` actually has two roles: - One is like the utility tool for getting file channels which is currently used for some streaming component, like `StateBackend, `BarrierBuffer`, `ShuffleService`. - The other one is real abstract methods for handing read/write operations which is mainly used for batch in sort-merge. If we assume that one component should only focus on one single function, it is reasonable to break down `IOManager` into two sub components as now. The other components could only rely on `FileChannelManager` instead of heavy-weight `IOManager`. Currently the shuffle service would only need internal `FileChannelManager`, I agree with that it might rely on the functions of write-behind or pre-fetching threads provided by `IOManager` future. If so, the other shuffle service implementations could implement a new `IOManager` instance or using current `IOManagerAsync` directly if needed future. Another option as you suggested is implementing a new `IOManager` instance without spawning threads, which also makes sense. If the shuffle service would need the read/writer functions future, it has to replace another `IOManager` implementation. But I still prefer the current way in PR, because the individual two sub components are both light-weight and they could be used together or separately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
zhijiangW edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#issuecomment-506310994 @StephanEwen thanks for sharing the background and bringing this concern! The abstract `IOManager` actually has two roles: - One is like the utility tool for getting file channels which is currently used for some streaming component, like `StateBackend, `BarrierBuffer`, `ShuffleService`. - The other one is real abstract methods for handing read/write operations which is mainly used for batch in sort-merge. If we assume that one component should only focus on one single function, it is reasonable to break down `IOManager` into two sub components as now. The other components could only rely on `FileChannelManager` instead of heavy-weight `IOManager`. Currently the shuffle service would only need internal `FileChannelManager`, I agree with that it might rely on the functions of write-behind or pre-fetching threads provided by `IOManager` future. If so, the other shuffle service implementations could implement a new `IOManager` instance or using current `IOManagerAsync` directly if needed future. Another option as you suggested is implementing a new `IOManager` instance without spawning threads, which also makes sense. If the shuffle service would need the read/writer functions future, it has to replace another `IOManager` implementation. But I still prefer the current way in PR, because the individual two sub components are both light-weight and they could be used together or separately as needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13016) StreamTaskNetworkInput can linger records for longer period of times
Piotr Nowojski created FLINK-13016: -- Summary: StreamTaskNetworkInput can linger records for longer period of times Key: FLINK-13016 URL: https://issues.apache.org/jira/browse/FLINK-13016 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.9.0 Reporter: Piotr Nowojski Assignee: Piotr Nowojski Fix For: 1.9.0 There is a bug in {{StreamTaskNetworkInput#isAvailable}}. It ignores the case if some data/records are currently buffered in \{{currentRecordDeserializer}} field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8914: [FLINK-12929] Pass TypeInformation in addSource
flinkbot commented on issue #8914: [FLINK-12929] Pass TypeInformation in addSource URL: https://github.com/apache/flink/pull/8914#issuecomment-506309273 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12929) scala.StreamExecutionEnvironment.addSource does not propagate TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-12929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12929: --- Labels: pull-request-available (was: ) > scala.StreamExecutionEnvironment.addSource does not propagate TypeInformation > - > > Key: FLINK-12929 > URL: https://issues.apache.org/jira/browse/FLINK-12929 > Project: Flink > Issue Type: Bug > Components: API / Scala, API / Type Serialization System >Reporter: Fabio Lombardelli >Priority: Critical > Labels: pull-request-available > > In {{scala.StreamExecutionEnvironment.addSource}} I would expect that > {{typeInfo}} is also passed to the {{javaEnv.addSource}} as second parameter > and not only passed to the {{returns}} method: > {code:java} > def addSource[T: TypeInformation](function: SourceFunction[T]): > DataStream[T] = { > require(function != null, "Function must not be null.") > > val cleanFun = scalaClean(function) > val typeInfo = implicitly[TypeInformation[T]] > asScalaStream(javaEnv.addSource(cleanFun, typeInfo>).returns(typeInfo)) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-12784) Support retention policy for InfluxDB metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-12784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber resolved FLINK-12784. - Resolution: Fixed Fix Version/s: 1.9.0 Merged in master via 0004056b494c42f5aa2b2cab5876eed7cfc20875 > Support retention policy for InfluxDB metrics reporter > -- > > Key: FLINK-12784 > URL: https://issues.apache.org/jira/browse/FLINK-12784 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.8.0 >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: influxdb, metrics, pull-request-available > Fix For: 1.9.0 > > Original Estimate: 12h > Time Spent: 20m > Remaining Estimate: 11h 40m > > InfluxDB metrics reporter uses default retention policy for saving metrics to > InfluxDB. This enhancement will allow user to specify retention policy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lombarde opened a new pull request #8914: [FLINK-12929] Pass TypeInformation in addSource
lombarde opened a new pull request #8914: [FLINK-12929] Pass TypeInformation in addSource URL: https://github.com/apache/flink/pull/8914 Co-authored-by: Georg Rollinger ## What is the purpose of the change Fixes https://issues.apache.org/jira/browse/FLINK-12929: Changed scala.StreamExecutionEnvironment.addSource to pass the type info for a SourceFunction that does not extend ResultTypeQueryable. ## Brief change log see above ## Verifying this change No test added as scala/StreamExecutionEnvironment.scala seems to be not covered yet ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8913: [FLINK-12968][table-common] Preparation for more casting utilities
flinkbot commented on issue #8913: [FLINK-12968][table-common] Preparation for more casting utilities URL: https://github.com/apache/flink/pull/8913#issuecomment-506308460 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] NicoK merged pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …
NicoK merged pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics … URL: https://github.com/apache/flink/pull/8668 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy edited a comment on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14
ifndef-SleePy edited a comment on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14 URL: https://github.com/apache/flink/pull/8870#issuecomment-506306235 @hmcl You mean downgrade to an earlier version? I don't think it's a good choice to use an older version. Because I believe 8.12 was well tested for Flink project by @zentol . The earlier version might not cover the feature of 8.12. And the reason I propose to upgrade to 8.14 is that, the commit message of checkstyle plugin shows that there is no breaking change between 8.12 and 8.14. It mapped 8.11/8.12/8.13 to 8.14. Actually I think it's better to upgrade to the newest stable version. Of course it should be well tested for Flink project. I think we should separate this into two steps, a quick fixing to 8.14 with a quick testing, and a big upgrade with well testing. Because many people have read the guide "Adopting a Code Style and Quality Guide" written by Stephan, and some of them have taken a practice just like me. And they may be confused on the setting of checkstyle plugin. And the big upgrade is not emergency we can do it quite later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr opened a new pull request #8913: [FLINK-12968][table-common] Preparation for more casting utilities
twalthr opened a new pull request #8913: [FLINK-12968][table-common] Preparation for more casting utilities URL: https://github.com/apache/flink/pull/8913 What is the purpose of the change Adds some utilities for more casting utilities and fixes a bug. ## Brief change log See commit messages. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
flinkbot commented on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14
ifndef-SleePy commented on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14 URL: https://github.com/apache/flink/pull/8870#issuecomment-506306235 @hmcl You mean downgrade to an earlier version? I don't think it's a good choice to use an older version. Because I believe 8.12 was well tested for Flink project by @zentol . The earlier version might not cover the feature of 8.12. And the reason I propose to upgrade to 8.14 is that, the commit message of checkstyle plugin shows that there is no breaking change between 8.12 and 8.14. It mapped 8.11/8.12/8.13 to 8.14. Actually I think it's better to upgrade to the newest stable version. Of course it should be well tested for Flink project. I think we should separate this into two steps, a quick fixing to 8.14 with a quick testing, and a big upgrade with well testing. Because many people have read the guide "Adopting a Code Style and Quality Guide" written by Stephan, and some of them have taken a practice just like me. And they may be confused on the setting of checkstyle plugin. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12709) Implement RestartBackoffTimeStrategyFactoryLoader
[ https://issues.apache.org/jira/browse/FLINK-12709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12709: --- Labels: pull-request-available (was: ) > Implement RestartBackoffTimeStrategyFactoryLoader > - > > Key: FLINK-12709 > URL: https://issues.apache.org/jira/browse/FLINK-12709 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.9.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > We need to implement a RestartBackoffTimeStrategyFactoryLoader to instantiate > RestartBackoffTimeStrategyFactory. > In order to be backwards compatible, the loader is responsible for converting > *RestartStrategy* > configurations([https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html])and > *RestartStrategyConfiguration* to latest *RestartBackoffTimeStrategy* > configurations. > The converted configurations will be used to create > *RestartBackoffTimeStrategy.Factory* via > *RestartBackoffTimeStrategy#createFactory(Configuration)*. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhuzhurk opened a new pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…
zhuzhurk opened a new pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy… URL: https://github.com/apache/flink/pull/8912 … restart strategy configs ## What is the purpose of the change *A loader is need to load factories of RestartBackoffTimeStrategy. In order to be backwards compatible, the loader should respect legacy RestartStrategy configurations. The legacy configuration can be in cluster config format (https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html) or in RestartStrategies.RestartStrategyConfiguration format.* ## Brief change log - *The loader coverts legacy configuration into new generation format* - *The loader creates RestartBackoffTimeStrategy factory from new format configs* ## Verifying this change - *Added unit tests RestartBackoffTimeStrategyFactoryLoader* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8530: [FLINK-12611][table-planner-blink] Make time indicator nullable in blink
asfgit closed pull request #8530: [FLINK-12611][table-planner-blink] Make time indicator nullable in blink URL: https://github.com/apache/flink/pull/8530 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12611) Make time indicator nullable in blink
[ https://issues.apache.org/jira/browse/FLINK-12611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12611. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: dac9f78658efda3339977e05591989cc363f3722 > Make time indicator nullable in blink > - > > Key: FLINK-12611 > URL: https://issues.apache.org/jira/browse/FLINK-12611 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > SQL: select max(rowtime), count(a) from T > There will be a AssertionError: type mismatch: > aggCall type: > TIMESTAMP(3) NOT NULL > inferred type: > TIMESTAMP(3) > Agg type checking is done before TimeIndicator materializes. So there is a > exception. > And before introducing nullable of LogicalType, we should modify this to > avoid more potential TypeCheck problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13015) Create validators, strategies and transformations required for porting logical expressions
Dawid Wysakowicz created FLINK-13015: Summary: Create validators, strategies and transformations required for porting logical expressions Key: FLINK-13015 URL: https://issues.apache.org/jira/browse/FLINK-13015 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.9.0 Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Fix For: 1.9.0 The goal of this task is to implement : InputTypeValidator: * by type root TypeStrategies: * cascade * explicit TypeTransformations: * to_nullable This set of classes will enable porting AND/OR/NOT/IS_TRUE/IS_FALSE/IS_NOT_TRUE/IS_NOT_FALSE to new type inference stack. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13014) ChainBreakTest failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-13014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-13014: -- Description: Log: [https://api.travis-ci.org/v3/job/551094138/log.txt] 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 16.872 s <<< FAILURE! - in org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time elapsed: 1.545 s <<< ERROR! java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs was: 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 16.872 s <<< FAILURE! - in org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time elapsed: 1.545 s <<< ERROR! java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs > ChainBreakTest failed on Travis > --- > > Key: FLINK-13014 > URL: https://issues.apache.org/jira/browse/FLINK-13014 > Project: Flink > Issue Type: Bug >Reporter: Haibo Sun >Priority: Major > > Log: [https://api.travis-ci.org/v3/job/551094138/log.txt] > > 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 16.872 s <<< FAILURE! - in > org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest > 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: > 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) > Time elapsed: 1.545 s <<< ERROR! > java.util.concurrent.ExecutionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Task received > cancellation from one of its inputs > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Task received > cancellation from one of its inputs > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task > received cancellation from one of its inputs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13014) ChainBreakTest failed on Travis
Haibo Sun created FLINK-13014: - Summary: ChainBreakTest failed on Travis Key: FLINK-13014 URL: https://issues.apache.org/jira/browse/FLINK-13014 Project: Flink Issue Type: Bug Reporter: Haibo Sun 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 16.872 s <<< FAILURE! - in org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time elapsed: 1.545 s <<< ERROR! java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task received cancellation from one of its inputs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StephanEwen commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
StephanEwen commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#issuecomment-506298639 @zhijiangW Thanks, these are very good comments. Some answers in line. > Maybe we should not always use the FILE_MMAP type as default. It could give an option to config or system auto selects the proper type internally based on some factors. E.g. via automatically checking whether use 64 bit JRE, if not only small size file fitting into virtual address space could use MMAP. For 32 bit JRE, FILE_FILE might be a better choice. That is a fair point. We could do that. Just out of curiosity: Do you know how many people use 32bit JVMs still? >n Java has currently the limitation of not being able to unmap files from user code. Due to this bug ("http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038;) in Sun's JRE, the files are unmapped when GC releases the byte buffers. We could supply the workaround mentioned in the bug report (see {@link #setUseUnmap}), which may fail on non-Oracle/OpenJDK JVMs. It forcefully unmaps the buffer on close by using an undocumented internal cleanup functionality. We could also check automatically whether this way is supported, then it might also be another factor to decide whether use FILE_MMAP by default. The code explicitly unmaps files using a utility from Netty, see https://github.com/apache/flink/pull/8880/files#diff-94d133764660d5d8372ca24c08257e25R138 This should work, the JVM actually segfaults if you use the buffer after that. > Currently we use Integer.MAX_VALUE as every mmap region size. If we do not want to make it configurable because users might also do not know how to tune it, we might automatically select different size based on whether 32 or 64 bit JRE. Because on 32 bit platform the address space can be very fragmented, so large size might not be mapped. Using a lower region size makes the implementation a little bit slower, but the chance is slower that mmap fails. As I know, lucene uses the size of 1 << 30 for 64 bit, and 1 << 28 for 32 bit. If we use `FILE` mode for 32 bit JVMs, it seems that this would be solved already. > In current FILE_MMAP implementation, we would always mmap a region after writing Integer.MAX_VALUE data size. This is simple way because we do not need to consider the partial data margin. But if the produced partitions could not be consumed in time limited by resource, I am not sure whether this pre-mmap has other downsides. E.g. one map produces 5000 subpartitions, but only one reduce task could be deployed at a time limited by resource. Another option is lazy mmap that means establishing the mmap after the partition is requested. But it seems complicated for handling the region margin to avoid partial header or data. Do we know if the eager mapping has downsides? There is no eager pre-fetching, so there should be no wasted I/O. It would keep the virtual memory page table occupied for longer (is that an issue?) and it might incentivize the kernel to keep that data cached in memory longer, and evict other non-mapped files from the OS file cache instead (sounds like an advantage, could it have a downside?). Eager mapping simplifies the implementation because it avoids the need to synchronize the lazy mapping when creating multiple readers concurrently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet
[ https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873124#comment-16873124 ] Xulang Wan edited comment on FLINK-11774 at 6/27/19 10:55 AM: -- I also encountered this issue bu using a string as key. The key is randomly generated between [0,1000] per element. Seems like the error happened because the KeyGroup is out of KeyGroupRange which doesn't make sense to me since non of them should be changed once the application has run. {color:#FF}After some investigations, i found why i met such a exception in my case:{color} {color:#33}In my case, the key was set by a RNG function, and in the bottom, Flink will associate this function with the stream's KeyExteactor which will be invoked whenever a key is queried per element.{color} {color:#33}So what actually happened is, when an element is partitioned by a key, it used a random number which is {color:#d04437}K1{color:#33}. {color}{color}{color} {color:#33}{color:#d04437}{color:#33}Later when we do some other operations related to the key (in this case we set a timer by key), {color}{color}{color}Flink will check if the key's KeyGroup(basically a hash from the key) is in the range of the operator's KeyGroup, But since KeyExteactor will be invoked again, we may get a different random number as a key, then the Exception is encountered since an element with this new key is not supposed to be handled by this Operator instance. {color:#33}To this ticket's case, I strongly suspect that the KeyExtractor will also return different values for same element which caused the issue.{color} was (Author: wanren192): I also encountered this issue bu using a string as key. The key is randomly generated between [0,1000] per element. Seems like the error happened because the KeyGroup is out of KeyGroupRange which doesn't make sense to me since non of them should be changed once the application has run. > IllegalArgumentException in HeapPriorityQueueSet > > > Key: FLINK-11774 > URL: https://issues.apache.org/jira/browse/FLINK-11774 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.7.2 > Environment: Can reproduce on the following configurations: > > OS: macOS 10.14.3 > Java: 1.8.0_202 > > OS: CentOS 7.2.1511 > Java: 1.8.0_102 >Reporter: Kirill Vainer >Priority: Major > Attachments: flink-bug-dist.zip, flink-bug-src.zip > > > Hi, > I encountered the following exception: > {code} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) > at flink.bug.App.main(App.java:21) > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876) > at > org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36) > at > org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396) > at >
[jira] [Updated] (FLINK-12996) Add predefined type validators, strategies, and transformations
[ https://issues.apache.org/jira/browse/FLINK-12996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-12996: - Affects Version/s: 1.9.0 > Add predefined type validators, strategies, and transformations > --- > > Key: FLINK-12996 > URL: https://issues.apache.org/jira/browse/FLINK-12996 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Fix For: 1.9.0 > > > FLINK-12924 introduced new interfaces for performing type inference. We need > a set of predefined type validators, strategies, and transformations > implemented with the new interfaces in order to represent the old type > checking logic. Those interfaces can be inspired by Calcite's > {{org.apache.calcite.sql.type.ReturnTypes}}, > {{org.apache.calcite.sql.type.OperandTypes}}, and > {{org.apache.calcite.sql.type.SqlTypeTransforms}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12996) Add predefined type validators, strategies, and transformations
[ https://issues.apache.org/jira/browse/FLINK-12996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-12996: - Fix Version/s: 1.9.0 > Add predefined type validators, strategies, and transformations > --- > > Key: FLINK-12996 > URL: https://issues.apache.org/jira/browse/FLINK-12996 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Fix For: 1.9.0 > > > FLINK-12924 introduced new interfaces for performing type inference. We need > a set of predefined type validators, strategies, and transformations > implemented with the new interfaces in order to represent the old type > checking logic. Those interfaces can be inspired by Calcite's > {{org.apache.calcite.sql.type.ReturnTypes}}, > {{org.apache.calcite.sql.type.OperandTypes}}, and > {{org.apache.calcite.sql.type.SqlTypeTransforms}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] mans2singh commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …
mans2singh commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics … URL: https://github.com/apache/flink/pull/8668#discussion_r298117923 ## File path: flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java ## @@ -111,7 +111,7 @@ public void testMetricReporting() throws Exception { reporter.report(); verify(postRequestedFor(urlPathEqualTo("/write")) - .withQueryParam("db", equalTo(TEST_INFLUXDB_DB)) + .withQueryParam(InfluxdbReporterOptions.DB.key(), equalTo(TEST_INFLUXDB_DB)) Review comment: @NicoK I've changed the query param name 'db' back to it's original value. I apologize for my misunderstanding. Please let me know if there is any other recommendation. Thanks again for your time/feedback. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12848) Method equals() in RowTypeInfo should consider fieldsNames
[ https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873996#comment-16873996 ] Jark Wu edited comment on FLINK-12848 at 6/27/19 10:40 AM: --- Thanks [~aloyszhang] for the debugging. So let's target this issue to 1.7 and 1.8. I think one way to quick fix this is not cache the RowTypeInfo in {{seenTypes}}. What do you think [~twalthr]? was (Author: jark): Thanks [~aloyszhang] for the debugging. So let's target this issue to 1.7 and 1.8. I think one way to quick fix this is not cache the RowTypeInfo in {{seenTypes}}, i,e. change [this line|https://github.com/apache/flink/blob/release-1.7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala#L66] to {{val relType = if (isSimple(typeInfo) || typeInfo.isInstanceOf[RowTypeInfo[_]]) {}} What do you think [~twalthr]? > Method equals() in RowTypeInfo should consider fieldsNames > -- > > Key: FLINK-12848 > URL: https://issues.apache.org/jira/browse/FLINK-12848 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.7.2 >Reporter: aloyszhang >Assignee: aloyszhang >Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > Since the `RowTypeInfo#equals()` does not consider the fieldNames , when > process data with RowTypeInfo type there may comes an error of the field > name. > {code:java} > String [] fields = new String []{"first", "second"}; > TypeInformation[] types = new TypeInformation[]{ > Types.ROW_NAMED(new String[]{"first001"}, Types.INT), > Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment env = > StreamTableEnvironment.getTableEnvironment(execEnv); > SimpleProcessionTimeSource streamTableSource = new > SimpleProcessionTimeSource(fields, types); > env.registerTableSource("testSource", streamTableSource); > Table sourceTable = env.scan("testSource"); > System.out.println("Source table schema : "); > sourceTable.printSchema(); > {code} > The table shcema will be > {code:java} > Source table schema : > root > |-- first: Row(first001: Integer) > |-- second: Row(first001: Integer) > |-- timestamp: TimeIndicatorTypeInfo(proctime) > {code} > the second field has the same name with the first field. > So, we should consider the fieldnames in RowTypeInfo#equals() > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298114249 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An implementation of {@link BoundedData} that writes directly into a File Channel. + * The readers are simple file channel readers using a simple dedicated buffer pool. + */ +final class FileChannelBoundedData implements BoundedData { + + private final Path filePath; + + private final FileChannel fileChannel; + + private final ByteBuffer[] headerAndBufferArray; + + private long size; + + private final int memorySegmentSize; + + FileChannelBoundedData( + Path filePath, + FileChannel fileChannel, + int memorySegmentSize) { + + this.filePath = checkNotNull(filePath); + this.fileChannel = checkNotNull(fileChannel); + this.memorySegmentSize = memorySegmentSize; + this.headerAndBufferArray = BufferReaderWriterUtil.allocatedWriteBufferArray(); + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + size += BufferReaderWriterUtil.writeToByteChannel(fileChannel, buffer, headerAndBufferArray); + } + + @Override + public void finishWrite() throws IOException { + fileChannel.close(); + } + + @Override + public Reader createReader() throws IOException { + checkState(!fileChannel.isOpen()); + + final FileChannel fc = FileChannel.open(filePath, StandardOpenOption.READ); + return new FileBufferReader(fc, memorySegmentSize); + } + + @Override + public long getSize() { + return size; + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(fileChannel); + Files.delete(filePath); + } + + // + + public static FileChannelBoundedData create(Path filePath, int memorySegmentSize) throws IOException { + final FileChannel fileChannel = FileChannel.open( + filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + + return new FileChannelBoundedData( + filePath, + fileChannel, + memorySegmentSize); + } + + // + + static final class FileBufferReader implements BoundedData.Reader, BufferRecycler { + + private static final int NUM_BUFFERS = 2; + + private final FileChannel fileChannel; + + private final ByteBuffer headerBuffer; + + private final ArrayDeque buffers; + + FileBufferReader(FileChannel fileChannel, int bufferSize) { + this.fileChannel = checkNotNull(fileChannel); + this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); +
[jira] [Commented] (FLINK-12848) Method equals() in RowTypeInfo should consider fieldsNames
[ https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873996#comment-16873996 ] Jark Wu commented on FLINK-12848: - Thanks [~aloyszhang] for the debugging. So let's target this issue to 1.7 and 1.8. I think one way to quick fix this is not cache the RowTypeInfo in {{seenTypes}}, i,e. change [this line|https://github.com/apache/flink/blob/release-1.7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala#L66] to {{val relType = if (isSimple(typeInfo) || typeInfo.isInstanceOf[RowTypeInfo[_]]) {}} What do you think [~twalthr]? > Method equals() in RowTypeInfo should consider fieldsNames > -- > > Key: FLINK-12848 > URL: https://issues.apache.org/jira/browse/FLINK-12848 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.7.2 >Reporter: aloyszhang >Assignee: aloyszhang >Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > Since the `RowTypeInfo#equals()` does not consider the fieldNames , when > process data with RowTypeInfo type there may comes an error of the field > name. > {code:java} > String [] fields = new String []{"first", "second"}; > TypeInformation[] types = new TypeInformation[]{ > Types.ROW_NAMED(new String[]{"first001"}, Types.INT), > Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment env = > StreamTableEnvironment.getTableEnvironment(execEnv); > SimpleProcessionTimeSource streamTableSource = new > SimpleProcessionTimeSource(fields, types); > env.registerTableSource("testSource", streamTableSource); > Table sourceTable = env.scan("testSource"); > System.out.println("Source table schema : "); > sourceTable.printSchema(); > {code} > The table shcema will be > {code:java} > Source table schema : > root > |-- first: Row(first001: Integer) > |-- second: Row(first001: Integer) > |-- timestamp: TimeIndicatorTypeInfo(proctime) > {code} > the second field has the same name with the first field. > So, we should consider the fieldnames in RowTypeInfo#equals() > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298113422 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java ## @@ -157,8 +159,9 @@ private static void initializeBoundedBlockingPartitions( int i = 0; try { for (; i < subpartitions.length; i++) { - subpartitions[i] = new BoundedBlockingSubpartition( - i, parent, ioManager.createChannel().getPathFile().toPath()); + final File spillFile = ioManager.createChannel().getPathFile(); + subpartitions[i] = + BoundedBlockingSubpartition.createWithFileAndMemoryMappedReader(i, parent, spillFile); Review comment: I am not confident enough in any other implementation, yet. I only added them for our testing and benchmarking. I am also always unsure about adding too many configurable parts for users. For API / behavior, we need customizability to fit the application and setup needs. For runtime and performance, I tend to favor one opinionated way that works well across the board. If there is another option that is 10% faster in 10% of the cases, I am not sure users benefit too much from that, but the added configuration makes it overall harder to understand and predict. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298112193 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; + +final class FileChannelMemoryMappedBoundedData implements BoundedData { + + /** The file channel backing the memory mapped file. */ + private final FileChannel fileChannel; + + /** The reusable array with header buffer and data buffer, to use gathering writes on the +* file channel ({@link java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */ + private final ByteBuffer[] headerAndBufferArray; + + /** All memory mapped regions. */ + private final ArrayList memoryMappedRegions; + + /** The path of the memory mapped file. */ + private final Path filePath; + + /** The position in the file channel. Cached for efficiency, because an actual position +* lookup in the channel involves various locks and checks. */ + private long pos; + + /** The position where the current memory mapped region must end. */ + private long endOfCurrentRegion; + + /** The position where the current memory mapped started. */ + private long startOfCurrentRegion; + + /** The maximum size of each mapped region. */ + private final long maxRegionSize; + + FileChannelMemoryMappedBoundedData( Review comment: True, could be private at the moment. My idea was to have it accessible for tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298111597 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; + +/** + * Putting and getting of a sequence of buffers to/from a FileChannel or a ByteBuffer. + * This class handles the headers, length encoding, memory slicing. + * + * The encoding is the same across FileChannel and ByteBuffer, so this class can + * write to a file and read from the byte buffer that results from mapping this file to memory. + */ +final class BufferReaderWriterUtil { + + static final int HEADER_LENGTH = 8; + + static final int HEADER_VALUE_IS_BUFFER = 0; + + static final int HEADER_VALUE_IS_EVENT = 1; + + // + // ByteBuffer read / write + // + + static boolean writeBuffer(Buffer buffer, ByteBuffer memory) { + final int bufferSize = buffer.getSize(); + + if (memory.remaining() < bufferSize + HEADER_LENGTH) { + return false; + } + + memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); + memory.putInt(bufferSize); + memory.put(buffer.getNioBufferReadable()); + return true; + } + + @Nullable + static Buffer sliceNextBuffer(ByteBuffer memory) { + final int remaining = memory.remaining(); + + // we only check the correct case where data is exhausted + // all other cases can only occur if our write logic is wrong and will already throw + // buffer underflow exceptions which will cause the read to fail. + if (remaining == 0) { + return null; + } + + final int header = memory.getInt(); + final int size = memory.getInt(); + + memory.limit(memory.position() + size); + ByteBuffer buf = memory.slice(); + memory.position(memory.limit()); + memory.limit(memory.capacity()); + + MemorySegment memorySegment = MemorySegmentFactory.wrapOffHeapMemory(buf); + + return bufferFromMemorySegment( + memorySegment, + FreeingBufferRecycler.INSTANCE, + size, + header == HEADER_VALUE_IS_EVENT); + } + + // + // ByteChannel read / write + // + + static long writeToByteChannel( + FileChannel channel, + Buffer buffer, + ByteBuffer[] arrayWithHeaderBuffer) throws IOException { + + final ByteBuffer headerBuffer = arrayWithHeaderBuffer[0]; + headerBuffer.clear(); + headerBuffer.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); + headerBuffer.putInt(buffer.getSize()); +
[jira] [Commented] (FLINK-13004) Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
[ https://issues.apache.org/jira/browse/FLINK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873994#comment-16873994 ] Jark Wu commented on FLINK-13004: - [~yunta] (y) > Correct the logic of needToCleanupState in > KeyedProcessFunctionWithCleanupState > --- > > Key: FLINK-13004 > URL: https://issues.apache.org/jira/browse/FLINK-13004 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Current implementation of needToCleanupState in > KeyedProcessFunctionWithCleanupState actually has potention bug: > {code:java} > protected Boolean needToCleanupState(Long timestamp) throws IOException { >if (stateCleaningEnabled) { > Long cleanupTime = cleanupTimeState.value(); > // check that the triggered timer is the last registered processing > time timer. > return null != cleanupTime && timestamp == cleanupTime; >} else { > return false; >} > } > {code} > Please note that it directly use "==" to judge whether *Long* type timestamp > and cleanupTime equals. However, if that value is larger than 127L, the > result would actually return false instead of wanted true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12536) Make BufferOrEventSequence#getNext() non-blocking
[ https://issues.apache.org/jira/browse/FLINK-12536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873993#comment-16873993 ] Stephan Ewen commented on FLINK-12536: -- Can you guys share why you believe this is important to change? - As far as I understand, this affects only the non-credit-based model, which is supported but meant to be used only in rare cases and to be removed at some point. - This adds quite a bit of extra complexity, like adding dependencies to the I/O manager in the alignment stack - In any healthy setup, we have extremely few buffers to read after alignment, which should not be so performance critical that it would warrant a bigger addition in code complexity. - The new implementation means that I/O is now queues behind other potentially large I/O ops (like from hash table reading or sort spill reading). That means we can introduce delays in those I/O and stall the streaming pipeline. - While the mailbox thread should generally not block, I think that these few occasional I/O ops of a legacy mode are fine. They happen only once in a while (after checkpoint) in a legacy mode. Do we ever expect these ops to block for significant enough time? If yes, would we expect that everything else is still low-latency in that case when the I/O ops block long, or could we actually tolerate this? tl:dr - this change seems a lot like premature optimization to me. We introduce complexity for an assumed problem where I am skeptical that it is a problem in practice. I am happy to be convinced of the opposite, but checking whether this is PMO would be good before merging this. > Make BufferOrEventSequence#getNext() non-blocking > - > > Key: FLINK-12536 > URL: https://issues.apache.org/jira/browse/FLINK-12536 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Piotr Nowojski >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently it is non-blocking in case of credit-based flow control (default), > however for \{{SpilledBufferOrEventSequence}} it is blocking on reading from > file. We might want to consider reimplementing it to be non blocking with > {{CompletableFuture isAvailable()}} method. > > Otherwise we will block mailbox processing for the duration of reading from > file - for example we will block processing time timers and potentially in > the future network flushes. > > This is not a high priority change, since it affects non-default > configuration option AND at the moment only processing time timers are > planned to be moved to the mailbox for 1.9. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on a change in pull request #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
wuchong commented on a change in pull request #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState URL: https://github.com/apache/flink/pull/8909#discussion_r298108636 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java ## @@ -157,7 +157,7 @@ public void onTimer( if (needToCleanupState(timestamp)) { // we check whether there are still records which have not been processed yet - boolean noRecordsToProcess = !inputState.contains(timestamp); + boolean noRecordsToProcess = !inputState.keys().iterator().hasNext(); Review comment: good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#issuecomment-506284860 @bowenli86 Sorry a little late to update this pr, please review again when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog
zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8636#issuecomment-506284600 rebase and force to push This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen
StephanEwen commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen URL: https://github.com/apache/flink/pull/8886#discussion_r298101578 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java ## @@ -22,25 +22,30 @@ import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import java.util.concurrent.atomic.AtomicLong; + /** * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics {@link DescriptiveStatistics} as a Flink {@link Histogram}. */ public class DescriptiveStatisticsHistogram implements org.apache.flink.metrics.Histogram { private final DescriptiveStatistics descriptiveStatistics; + private final AtomicLong elementsSeen = new AtomicLong(); Review comment: Is this class assumed to be use in a multi-threaded context? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13013) Make sure that SingleInputGate can always request partitions
[ https://issues.apache.org/jira/browse/FLINK-13013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-13013: --- Description: Currently {{SingleInputGate}} requests partitions only on the first attempt to fetch the data. Before requesting partitions, no data availability notifications can get through. This doesn't work well with a new non blocking {{InputGate}} interface, since on newly created {{SingleInputGates,}} {{InputGate#isAvailable()}} might return not available, and it will be only able to switch to available, after first call to {{SingleInputGate#pollNext()}}. However this might never happen, since caller could be waiting indefinitely on {{SingleInputGate#isAvailable()}}. (was: Currently {{SingleInputGate}} requests partitions only on the first attempt to fetch the data. Before requesting partitions, no data availability notifications can get through. This doesn't work well with a new non blocking {{InputGate}} interface, since on newly created {{SingleInputGates,}} {{InputGate#isAvailable()}} might return not available, and it will be only able to switch to available, after first call to {{SingleInputGate#pollNext()}}.) > Make sure that SingleInputGate can always request partitions > > > Key: FLINK-13013 > URL: https://issues.apache.org/jira/browse/FLINK-13013 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > > Currently {{SingleInputGate}} requests partitions only on the first attempt > to fetch the data. Before requesting partitions, no data availability > notifications can get through. This doesn't work well with a new non blocking > {{InputGate}} interface, since on newly created {{SingleInputGates,}} > {{InputGate#isAvailable()}} might return not available, and it will be only > able to switch to available, after first call to > {{SingleInputGate#pollNext()}}. However this might never happen, since caller > could be waiting indefinitely on {{SingleInputGate#isAvailable()}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] aljoscha commented on issue #8371: [FLINK-9679] - Add AvroSerializationSchema
aljoscha commented on issue #8371: [FLINK-9679] - Add AvroSerializationSchema URL: https://github.com/apache/flink/pull/8371#issuecomment-506282454 @Wosin could you please rebase and squash the various commits into some commits that make sense, i.e. separate pre-requisites like version upgrade into a separate commit and then have the actual implementation commit? I'd like to review this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8877: [FLINK-12984][metrics] only call Histogram#getStatistics() once where possible
StephanEwen commented on issue #8877: [FLINK-12984][metrics] only call Histogram#getStatistics() once where possible URL: https://github.com/apache/flink/pull/8877#issuecomment-506281800 Looks good in general. What is a bit strange is that the dropwizard metrics now have dependencies on flink-runtime. Is that because `DescriptiveStatisticsHistogram` is in the wrong place (`flink-runtime`)? Should that implementation be in the metrics projects instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13013) Make sure that SingleInputGate can always request partitions
Piotr Nowojski created FLINK-13013: -- Summary: Make sure that SingleInputGate can always request partitions Key: FLINK-13013 URL: https://issues.apache.org/jira/browse/FLINK-13013 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Piotr Nowojski Assignee: Piotr Nowojski Currently {{SingleInputGate}} requests partitions only on the first attempt to fetch the data. Before requesting partitions, no data availability notifications can get through. This doesn't work well with a new non blocking {{InputGate}} interface, since on newly created {{SingleInputGates,}} {{InputGate#isAvailable()}} might return not available, and it will be only able to switch to available, after first call to {{SingleInputGate#pollNext()}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13013) Make sure that SingleInputGate can always request partitions
[ https://issues.apache.org/jira/browse/FLINK-13013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873978#comment-16873978 ] Piotr Nowojski commented on FLINK-13013: As a hotfix we are attempting to always wake up {{SingleInputGate#isAvailable()}} upon creation/registration of an {{InputChannel}}, so that {{requestPartitions()}} can happen. For a long term solution, life cycle of the partitions/input channels/input gate might need to be changed. > Make sure that SingleInputGate can always request partitions > > > Key: FLINK-13013 > URL: https://issues.apache.org/jira/browse/FLINK-13013 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > > Currently {{SingleInputGate}} requests partitions only on the first attempt > to fetch the data. Before requesting partitions, no data availability > notifications can get through. This doesn't work well with a new non blocking > {{InputGate}} interface, since on newly created {{SingleInputGates,}} > {{InputGate#isAvailable()}} might return not available, and it will be only > able to switch to available, after first call to > {{SingleInputGate#pollNext()}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StephanEwen commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager
StephanEwen commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager URL: https://github.com/apache/flink/pull/8646#issuecomment-506278154 Sorry for jumping in late. Initially, we added this dependency to make it possible for spilling partitions to use write-behind and separate pre-fetching threads. Do we want to drop this completely? Also, when it comes to reducing dependencies in the network shuffle environment, what is the gain by replacing IOManager with FileChannelManager? If this is mainly about the threads that the IOManagerAsync spawns, another option could be to has an IOManagerSync without threads. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] knaufk commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
knaufk commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r298093759 ## File path: flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java ## @@ -92,9 +105,23 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma restPort, savepointRestoreSettings, jobId, + jobIdSeed, jobClassName); } + private String getJobIdSeed(@Nonnull final CommandLine commandLine) throws FlinkParseException { + + boolean isJobIdSeedConfigured = commandLine.hasOption(JOB_ID_SEED_OPTION.getOpt()); + boolean isJobIdConfigured = commandLine.hasOption(JOB_ID_OPTION.getOpt()); + + if (isJobIdSeedConfigured && isJobIdConfigured) { Review comment: I would rather change the README so that when you specify the `--job-id` explicitly, you have to remove the `--job-id-seed` parameter. The templates are an opinionated starting point to run Job/Application Clusters on Kubernetes, as such they should contain the `--job-id-seed`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] NicoK commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …
NicoK commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics … URL: https://github.com/apache/flink/pull/8668#discussion_r298092960 ## File path: flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java ## @@ -111,7 +111,7 @@ public void testMetricReporting() throws Exception { reporter.report(); verify(postRequestedFor(urlPathEqualTo("/write")) - .withQueryParam("db", equalTo(TEST_INFLUXDB_DB)) + .withQueryParam(InfluxdbReporterOptions.DB.key(), equalTo(TEST_INFLUXDB_DB)) Review comment: Please revert this change here, since the query param is actually unrelated to how we name our options. In my last review, I was only referring to the `createMetricRegistry` method below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] knaufk commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
knaufk commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/8741#discussion_r298093759 ## File path: flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java ## @@ -92,9 +105,23 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma restPort, savepointRestoreSettings, jobId, + jobIdSeed, jobClassName); } + private String getJobIdSeed(@Nonnull final CommandLine commandLine) throws FlinkParseException { + + boolean isJobIdSeedConfigured = commandLine.hasOption(JOB_ID_SEED_OPTION.getOpt()); + boolean isJobIdConfigured = commandLine.hasOption(JOB_ID_OPTION.getOpt()); + + if (isJobIdSeedConfigured && isJobIdConfigured) { Review comment: I would rather change the README so that you specify the `--job-id` explicitly, you have to remove the `--job-id-seed` parameter. The templates are an opinionated starting point to run Job/Application Clusters on Kubernetes, as such they should contain the `--job-id-seed`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
zhijiangW commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298093109 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java ## @@ -220,28 +225,29 @@ private static int alignSize(int maxRegionSize) { * The "reader" for the memory region. It slices a sequence of buffers from the * sequence of mapped ByteBuffers. */ - static final class BufferSlicer { + static final class BufferSlicer implements BoundedData.Reader { - /** The reader/decoder to the memory mapped region with the data we currently read from. + /** The memory mapped region we currently read from. * Max 2GB large. Further regions may be in the {@link #furtherData} field. */ - private BufferToByteBuffer.Reader data; + private ByteBuffer data; Review comment: it might be better to name `currentData` comparing with `furtherData` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create
flinkbot edited a comment on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create URL: https://github.com/apache/flink/pull/8779#issuecomment-503009589 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @azagrebin * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @azagrebin * ❗ 3. Needs [attention] from. - Needs attention by @azagrebin * ✅ 4. The change fits into the overall [architecture]. - Approved by @azagrebin * ✅ 5. Overall code [quality] is good. - Approved by @azagrebin Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create
azagrebin commented on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create URL: https://github.com/apache/flink/pull/8779#issuecomment-506270426 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
dianfu commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910#issuecomment-506268938 @WeiZhong94 Thanks a lot for the review. Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
WeiZhong94 commented on a change in pull request #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910#discussion_r298081292 ## File path: flink-python/pyflink/shell.py ## @@ -84,7 +84,7 @@ * os.remove(sink_path) * else: * shutil.rmtree(sink_path) -* bt_env.exec_env().set_parallelism(1) +* b_env.set_parallelism(1) Review comment: I think we can add "b_env" at the beginning of the example to tell the users what variable is predefined. "Use the 'bt_env' variable" -> "Use the 'b_env', 'bt_env' variable" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Wosin commented on issue #8371: [FLINK-9679] - Add AvroSerializationSchema
Wosin commented on issue #8371: [FLINK-9679] - Add AvroSerializationSchema URL: https://github.com/apache/flink/pull/8371#issuecomment-506265879 Hey, Any update on this ?? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
flinkbot commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506262493 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12995) Add Hive-1.2.1 build to Travis
[ https://issues.apache.org/jira/browse/FLINK-12995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12995: --- Labels: pull-request-available (was: ) > Add Hive-1.2.1 build to Travis > -- > > Key: FLINK-12995 > URL: https://issues.apache.org/jira/browse/FLINK-12995 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Rui Li >Assignee: Rui Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lirui-apache opened a new pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
lirui-apache opened a new pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911 ## What is the purpose of the change To add Hive-1.2.1 build to Travis. ## Brief change log - Added a Travis stage to build and test against Hive-1.2.1 ## Verifying this change Existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? NA This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12751) Create file based HA support
[ https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873929#comment-16873929 ] Boris Lublinsky commented on FLINK-12751: - Although the kubelet can die, if the node is alive, it will be immediately restarted, as it is a service. The racing condition is possible, but not probable. If you want to ensure uniqueness of the job manager, we can add a lock file with job manager IP. See [https://events.linuxfoundation.org/wp-content/uploads/2017/11/Leader-Election-Machanism-in-Distributed-System-Using-a-Globally-Accessible-Filesystem-OSS-Dharmendra-Kushwaha.pdf] for example > Create file based HA support > > > Key: FLINK-12751 > URL: https://issues.apache.org/jira/browse/FLINK-12751 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.8.0, 1.9.0, 2.0.0 > Environment: Flink on k8 and Mini cluster >Reporter: Boris Lublinsky >Priority: Major > Labels: features, pull-request-available > Original Estimate: 168h > Time Spent: 10m > Remaining Estimate: 167h 50m > > In the current Flink implementation, HA support can be implemented either > using Zookeeper or Custom Factory class. > Add HA implementation based on PVC. The idea behind this implementation > is as follows: > * Because implementation assumes a single instance of Job manager (Job > manager selection and restarts are done by K8 Deployment of 1) > URL management is done using StandaloneHaServices implementation (in the case > of cluster) and EmbeddedHaServices implementation (in the case of mini > cluster) > * For management of the submitted Job Graphs, checkpoint counter and > completed checkpoint an implementation is leveraging the following file > system layout > {code} > ha -> root of the HA data > checkpointcounter -> checkpoint counter folder > -> job id folder > -> counter file > -> another job id folder > ... > completedCheckpoint -> completed checkpoint folder > -> job id folder > -> checkpoint file > -> checkpoint file > ... > -> another job id folder > ... > submittedJobGraph -> submitted graph folder > -> job id folder > -> graph file > -> another job id folder > ... > {code} > An implementation should overwrites 2 of the Flink files: > * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA > service > * HighAvailabilityMode - added `FILESYSTEM` to available HA options. > The actual implementation adds the following classes: > * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` > for file system > * `FileSystemUtils` - support class for creation of runtime components. > * `FileSystemStorageHelper` - file system operations implementation for > filesystem based HA > * `FileSystemCheckpointRecoveryFactory` - an implementation of a > `CheckpointRecoveryFactory`for file system > * `FileSystemCheckpointIDCounter` - an implementation of a > `CheckpointIDCounter` for file system > * `FileSystemCompletedCheckpointStore` - an implementation of a > `CompletedCheckpointStore` for file system > * `FileSystemSubmittedJobGraphStore` - an implementation of a > `SubmittedJobGraphStore` for file system -- This message was sent by Atlassian JIRA (v7.6.3#76005)