[GitHub] [flink] StefanRRichter commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
StefanRRichter commented on a change in pull request #8692: [FLINK-12804] 
Introduce mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298170108
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ * */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = 

[GitHub] [flink] sjwiesman commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-27 Thread GitBox
sjwiesman commented on a change in pull request #8903: [FLINK-12747][docs] 
Getting Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298169905
 
 

 ##
 File path: pom.xml
 ##
 @@ -82,11 +81,13 @@ under the License.
flink-mesos
flink-metrics
flink-yarn
+   flink-shaded-yarn-tests
flink-yarn-tests
flink-fs-tests
flink-docs
flink-python
flink-ml-parent
+   flink-walkthroughs
 
 Review comment:
   Rebased on fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13019) IT test for fine-grained recovery

2019-06-27 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-13019:
---

 Summary: IT test for fine-grained recovery
 Key: FLINK-13019
 URL: https://issues.apache.org/jira/browse/FLINK-13019
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13018) Serving docs locally with jekyll fails with inotify limit

2019-06-27 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-13018:
---

 Summary: Serving docs locally with jekyll fails with inotify limit
 Key: FLINK-13018
 URL: https://issues.apache.org/jira/browse/FLINK-13018
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.9.0
Reporter: Nico Kruber
Assignee: Nico Kruber


Both {{build-docs.sh -i}} and {{build-docs.sh -p}} currently fail (also in the 
dockerized builds in {{docs/docker}}):

{code}
$ ./build_docs.sh -p
Fetching gem metadata from https://rubygems.org/..
...
Bundle complete! 8 Gemfile dependencies, 36 gems now installed.
Bundled gems are installed into `./.rubydeps`
Configuration file: /home/nico/Projects/flink/docs/_config.yml
Source: /home/nico/Projects/flink/docs
   Destination: /home/nico/Projects/flink/docs/content
 Incremental build: disabled. Enable with --incremental
  Generating... 
done in 167.943 seconds.
jekyll 3.7.2 | Error:  Too many open files - Failed to initialize inotify: the 
user limit on the total number of inotify instances has been reached.
{code}

Probably, {{inotify}} is used in a way to monitor single files and not just 
directories but I don't know that and couldn't find a way to change how jekyll 
is using inotify.

I wouldn't suggest working around by setting a higher inotify limit but 
upgrading jekyll did not solve it and so far there are two options:
# disable watching files via {{--no-watch}}
# use polling instead of `inotify` via `--force_polling`
# try to reduce the set of files by adding excludes for (expected) static files




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298160404
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateView.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlParseException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.List;
+
+/**
+ * CREATE VIEW DDL sql call.
+ */
+public class SqlCreateView extends SqlCreate implements ExtendedSqlNode {
+   public static final SqlSpecialOperator OPERATOR = new 
SqlSpecialOperator("CREATE_VIEW", SqlKind.CREATE_VIEW);
+
+   private final SqlIdentifier viewName;
+   private final SqlNodeList fieldList;
+   private final SqlNode query;
+   private final SqlCharStringLiteral comment;
+
+   public SqlCreateView(
+   SqlParserPos pos,
+   SqlIdentifier viewName,
+   SqlNodeList fieldList,
+   SqlNode query,
+   boolean replace,
+   SqlCharStringLiteral comment) {
+   super(OPERATOR, pos, replace, false);
+   this.viewName = viewName;
+   this.fieldList = fieldList;
+   this.query = query;
+   this.comment = comment;
+   }
+
+   @Override
+   public List getOperandList() {
+   List ops = Lists.newArrayList();
+   ops.add(viewName);
+   ops.add(fieldList);
 
 Review comment:
   Do we want to support field list in the first version? If not, I would 
suggest to remove it to keep compatible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298130713
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 ##
 @@ -583,6 +583,25 @@ public void testInvalidUpsertOverwrite() {
"OVERWRITE expression is only used with INSERT mode");
}
 
+   @Test
+   public void testCreateViewWithProperty() {
 
 Review comment:
   Please rename the method name. 
   
   Please also add some tests about create view without comment and drop view.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
StefanRRichter commented on a change in pull request #8692: [FLINK-12804] 
Introduce mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298162033
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -226,13 +219,18 @@ protected StreamTask(
this.accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
this.recordWriters = createRecordWriters(configuration, 
environment);
this.syncSavepointLatch = new SynchronousSavepointLatch();
-   this.mailbox = new MailboxImpl();
+   this.mailboxProcessor = new MailboxProcessor(this);
 
 Review comment:
   I did this, so as to avoid any changes to all the subclasses - but I guess 
then it's ok and I will simply have a couple more touched files.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8850: [FLINK-12954] Supports 
create(drop) view grammar for sql parser
URL: https://github.com/apache/flink/pull/8850#discussion_r298161597
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
 ##
 @@ -334,6 +317,50 @@ void PartitionSpecCommaList(SqlNodeList list) :
 
 }
 
+/**
+* Parses a create view or replace existing view statement.
+*   CREATE [OR REPLACE] VIEW view_name [ (field1, field2 ...) ] AS 
select_statement
+*/
+SqlCreate SqlCreateView(Span s, boolean replace) : {
+boolean replaceView = false;
+SqlIdentifier viewName;
+SqlCharStringLiteral comment = null;
+SqlNode query;
+SqlNodeList fieldList = SqlNodeList.EMPTY;
+}
+{
+[   { replaceView = true; } ]
 
 Review comment:
   We can remove this, the parameter `replace` already do this.
   `SqlCreateTable` is the same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506333942
 
 
   cc @bowenli86 @xuefuz @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build 
to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506333597
 
 
   Here's [build 
result](https://travis-ci.org/lirui-apache/flink/builds/551257514) of my own 
repo. The connector_hive_1 failed as expected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME into docker

2019-06-27 Thread GitBox
flinkbot commented on issue #8917: [FLINK-13017][docs] do not mount local $HOME 
into docker
URL: https://github.com/apache/flink/pull/8917#issuecomment-506330683
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK opened a new pull request #8917: [FLINK-13017][docs] do not mount local $HOME into docker

2019-06-27 Thread GitBox
NicoK opened a new pull request #8917: [FLINK-13017][docs] do not mount local 
$HOME into docker
URL: https://github.com/apache/flink/pull/8917
 
 
   ## What is the purpose of the change
   
   Remove the (writable!) mount of a user's $HOME into the dockerized 
documentation build container in order to
   - make the builds independent from the host system (making them reproducible)
   - not have the commands in the container affect the host
   
   ## Brief change log
   
   - remove mounting user $HOME
   
   ## Verifying this change
   
   I verified building the docs inside the new environment.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13017) Broken and irreproducible dockerized docs build

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13017:
---
Labels: pull-request-available  (was: )

> Broken and irreproducible dockerized docs build
> ---
>
> Key: FLINK-13017
> URL: https://issues.apache.org/jira/browse/FLINK-13017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
>
> The build tools around {{docs/docker}} seem broken and (on my machine) give 
> errors like the following while it is working on a colleague's machine:
> {code}
> bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory
> bash: __git_ps1: command not found
> {code}
> {code}
> /usr/bin/env: 'ruby.ruby2.5': No such file or directory
> bash: __git_ps1: command not found
> {code}
> Reason seems to be that your whole user's $HOME is mounted (writable!) into 
> the docker container. We should just mount the docs directory to get
> # builds which are independent from the host system (making them reproducible)
> # not have the commands in the container affect the host(!)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298150038
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxExecutorServiceImpl.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of an executor service build around a mailbox-based 
execution model.
+ */
+public class TaskMailboxExecutorServiceImpl extends AbstractExecutorService 
implements TaskMailboxExecutorService {
+
+   /** Reference to the thread that executes the mailbox letters.  */
+   @Nonnull
+   private final Thread taskMailboxThread;
+
+   /** The mailbox that manages the submitted runnable objects. */
+   @Nonnull
+   private final Mailbox mailbox;
+
+   public TaskMailboxExecutorServiceImpl(@Nonnull Mailbox mailbox) {
+   this(mailbox, Thread.currentThread());
+   }
+
+   public TaskMailboxExecutorServiceImpl(@Nonnull Mailbox mailbox, 
@Nonnull Thread taskMailboxThread) {
+   this.mailbox = mailbox;
+   this.taskMailboxThread = taskMailboxThread;
+   }
+
+   @Override
+   public void execute(@Nonnull Runnable command) {
+   checkIsNotMailboxThread();
+   try {
+   mailbox.putMail(command);
+   } catch (InterruptedException irex) {
+   Thread.currentThread().interrupt();
+   throw new RejectedExecutionException("Sender thread was 
interrupted while blocking on mailbox.", irex);
+   } catch (MailboxStateException mbex) {
+   throw new RejectedExecutionException(mbex);
+   }
+   }
+
+   @Override
+   public boolean tryExecute(Runnable command) {
+   try {
+   return mailbox.tryPutMail(command);
+   } catch (MailboxStateException e) {
+   throw new RejectedExecutionException(e);
+   }
+   }
+
+   @Override
+   public void yield() throws InterruptedException, IllegalStateException {
+   checkIsMailboxThread();
+   try {
+   Runnable runnable = mailbox.takeMail();
+   runnable.run();
+   } catch (MailboxStateException e) {
+   throw new IllegalStateException("Mailbox can no longer 
supply runnables for yielding.", e);
+   }
+   }
+
+   @Override
+   public boolean tryYield() throws IllegalStateException {
+   checkIsMailboxThread();
+   try {
+   Optional runnableOptional = 
mailbox.tryTakeMail();
+   if (runnableOptional.isPresent()) {
+   runnableOptional.get().run();
+   return true;
+   } else {
+   return false;
+   }
+   } catch (MailboxStateException e) {
+   throw new IllegalStateException("Mailbox can no longer 
supply runnables for yielding.", e);
+   }
+   }
+
+   @Override
+   public boolean isMailboxThread() {
+   return Thread.currentThread() == taskMailboxThread;
+   }
+
+   @Override
+   public void shutdown() {
+   mailbox.quiesce();
+   }
+
+   @Nonnull
+   @Override
+   public List shutdownNow() {
+   return mailbox.close();
+   }
+
+   @Override
+   public boolean isShutdown() {
+   return mailbox.getState() != Mailbox.State.OPEN;
+   }
+
+   @Override
+   public boolean isTerminated() {
+   return mailbox.getState() == Mailbox.State.CLOSED;
+ 

[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298146026
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = new 

[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298135855
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = new 

[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298130215
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ * */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = new 

[jira] [Updated] (FLINK-13017) Broken and irreproducible dockerized docs build

2019-06-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-13017:

Description: 
The build tools around {{docs/docker}} seem broken and (on my machine) give 
errors like the following while it is working on a colleague's machine:
{code}
bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory
bash: __git_ps1: command not found
{code}

{code}
/usr/bin/env: 'ruby.ruby2.5': No such file or directory
bash: __git_ps1: command not found
{code}

Reason seems to be that your whole user's $HOME is mounted (writable!) into the 
docker container. We should just mount the docs directory to get
# builds which are independent from the host system (making them reproducible)
# not have the commands in the container affect the host(!)

  was:
The build tools around {{docs/docker}} seem broken and (on my machine) give 
errors like the following while it is working on a colleague's machine:
{code}
bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory
bash: __git_ps1: command not found
{code}

{code}
```/usr/bin/env: 'ruby.ruby2.5': No such file or directory
bash: __git_ps1: command not found```
{code}

Reason seems to be that your whole user's $HOME is mounted (writable!) into the 
docker container. We should just mount the docs directory to get
# builds which are independent from the host system (making them reproducible)
# not have the commands in the container affect the host(!)


> Broken and irreproducible dockerized docs build
> ---
>
> Key: FLINK-13017
> URL: https://issues.apache.org/jira/browse/FLINK-13017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.6.4, 1.7.2, 1.8.0, 1.9.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>
> The build tools around {{docs/docker}} seem broken and (on my machine) give 
> errors like the following while it is working on a colleague's machine:
> {code}
> bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory
> bash: __git_ps1: command not found
> {code}
> {code}
> /usr/bin/env: 'ruby.ruby2.5': No such file or directory
> bash: __git_ps1: command not found
> {code}
> Reason seems to be that your whole user's $HOME is mounted (writable!) into 
> the docker container. We should just mount the docs directory to get
> # builds which are independent from the host system (making them reproducible)
> # not have the commands in the container affect the host(!)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298133778
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = new 

[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298148314
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = new 

[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298153219
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = new 

[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298125132
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##
 @@ -226,13 +219,18 @@ protected StreamTask(
this.accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
this.recordWriters = createRecordWriters(configuration, 
environment);
this.syncSavepointLatch = new SynchronousSavepointLatch();
-   this.mailbox = new MailboxImpl();
+   this.mailboxProcessor = new MailboxProcessor(this);
 
 Review comment:
   Currently the `MailboxDefaultAction` interface has only one abstract method.
   You can write `new MailboxProcessor(this::performDefaultAction)` instead and 
remove `MailboxDefaultAction` from `StreamTask implements`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-27 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298128321
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ * */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = new 

[GitHub] [flink] WeiZhong94 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-27 Thread GitBox
WeiZhong94 commented on issue #8910: [hotfix][python] Aligns with Java Table 
API by removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910#issuecomment-506327614
 
 
   LGTM, +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
flinkbot commented on issue #8916: [FLINK-12897][python][docs] Improve the 
Python Table API docs by adding more examples
URL: https://github.com/apache/flink/pull/8916#issuecomment-506327371
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12897) Improve the Python Table API docs by adding more examples

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12897:
---
Labels: pull-request-available  (was: )

> Improve the Python Table API docs by adding more examples
> -
>
> Key: FLINK-12897
> URL: https://issues.apache.org/jira/browse/FLINK-12897
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Documentation
>Reporter: Dian Fu
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
>
> As discussed in [https://github.com/apache/flink/pull/8774], we need to 
> improve the Python Table API docs by adding more examples. Currently, a few 
> APIs have no examples.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8915: [FLINK-13015][table-api-java] Create validators, strategies and transformations required for porting logical expressions

2019-06-27 Thread GitBox
flinkbot commented on issue #8915:  [FLINK-13015][table-api-java] Create 
validators, strategies and transformations required for porting logical 
expressions
URL: https://github.com/apache/flink/pull/8915#issuecomment-506326938
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 opened a new pull request #8916: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples

2019-06-27 Thread GitBox
WeiZhong94 opened a new pull request #8916: [FLINK-12897][python][docs] Improve 
the Python Table API docs by adding more examples
URL: https://github.com/apache/flink/pull/8916
 
 
   ## What is the purpose of the change
   
   *This pull request add more Python Table API and relevant DataStream/DataSet 
configuration examples on flink docs and python sphinx docs.*
   
   
   ## Brief change log
   
 - *Add examples for the obscure API in python sphinx docs.*
 - *Add examples for supported python API in flink docs.*
 - *Fix some typo and style in docs.*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-9728) Wrong ruby version for building flink docs in docker container

2019-06-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber resolved FLINK-9728.

Resolution: Duplicate

> Wrong ruby version for building flink docs in docker container
> --
>
> Key: FLINK-9728
> URL: https://issues.apache.org/jira/browse/FLINK-9728
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Florian Schmidt
>Priority: Minor
>
> When trying to use the dockerized jekyll build I get the following error
>  
> {code:java}
> $ ./build_docs.sh -p
> Your Ruby version is 2.0.0, but your Gemfile specified >= 2.1.0{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zentol merged pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink

2019-06-27 Thread GitBox
zentol merged pull request #8902: [hotfix][docs] fix wrong link in 
streamfile_sink
URL: https://github.com/apache/flink/pull/8902
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13015) Create validators, strategies and transformations required for porting logical expressions

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13015:
---
Labels: pull-request-available  (was: )

> Create validators, strategies and transformations required for porting 
> logical expressions
> --
>
> Key: FLINK-13015
> URL: https://issues.apache.org/jira/browse/FLINK-13015
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> The goal of this task is to implement :
> InputTypeValidator:
> * by type root
> TypeStrategies:
> * cascade
> * explicit
> TypeTransformations:
> * to_nullable
> This set of classes will enable porting 
> AND/OR/NOT/IS_TRUE/IS_FALSE/IS_NOT_TRUE/IS_NOT_FALSE to new type inference 
> stack.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dawidwys opened a new pull request #8915: [FLINK-13015][table-api-java] Create validators, strategies and transformations required for porting logical expressions

2019-06-27 Thread GitBox
dawidwys opened a new pull request #8915:  [FLINK-13015][table-api-java] Create 
validators, strategies and transformations required for porting logical 
expressions
URL: https://github.com/apache/flink/pull/8915
 
 
   ## What is the purpose of the change
   
   Create validators, strategies and transformations required for porting 
logical expressions
   
   
   ## Brief change log
   
   See commit messages
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   Additionally added tests for the newly introduced classess.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13017) Broken and irreproducible dockerized docs build

2019-06-27 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-13017:
---

 Summary: Broken and irreproducible dockerized docs build
 Key: FLINK-13017
 URL: https://issues.apache.org/jira/browse/FLINK-13017
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.8.0, 1.7.2, 1.6.4, 1.9.0
Reporter: Nico Kruber
Assignee: Nico Kruber


The build tools around {{docs/docker}} seem broken and (on my machine) give 
errors like the following while it is working on a colleague's machine:
{code}
bash: /etc/bash_completion.d/git-prompt.sh: No such file or directory
bash: __git_ps1: command not found
{code}

{code}
```/usr/bin/env: 'ruby.ruby2.5': No such file or directory
bash: __git_ps1: command not found```
{code}

Reason seems to be that your whole user's $HOME is mounted (writable!) into the 
docker container. We should just mount the docs directory to get
# builds which are independent from the host system (making them reproducible)
# not have the commands in the container affect the host(!)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12980) Getting Started - Add Top-Level Section to Existing Documentation

2019-06-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber resolved FLINK-12980.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged in master via e377bf669cb8a12fe9eec241236421698198ea1e (unfortunately 
with the wrong jira ticket tag)

> Getting Started - Add Top-Level Section to Existing Documentation
> -
>
> Key: FLINK-12980
> URL: https://issues.apache.org/jira/browse/FLINK-12980
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] NicoK merged pull request #8873: [FLINK-12639] [docs] add "getting started" section to documentation

2019-06-27 Thread GitBox
NicoK merged pull request #8873: [FLINK-12639] [docs] add "getting started" 
section to documentation
URL: https://github.com/apache/flink/pull/8873
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on a change in pull request #8861: [FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new savepoints

2019-06-27 Thread GitBox
sjwiesman commented on a change in pull request #8861: 
[FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new 
savepoints
URL: https://github.com/apache/flink/pull/8861#discussion_r298141523
 
 

 ##
 File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
 ##
 @@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.output.OperatorStateReducer;
+import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
+import org.apache.flink.state.api.output.SavepointOutputFormat;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Any savepoint that can be written to from a batch context.
+ * @param  The implementation type.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public abstract class WritableSavepoint {
+
+   protected final Map transformations;
+
+   protected final List droppedOperators;
+
+   WritableSavepoint() {
+   this.transformations  = new HashMap<>();
+   this.droppedOperators = new ArrayList<>();
+   }
+
+   /**
+* Drop an existing operator from the savepoint.
+* @param uid The uid of the operator.
+* @return A modified savepoint.
+*/
+   @SuppressWarnings("unchecked")
+   public F removeOperator(String uid) {
+   droppedOperators.add(uid);
+   transformations.remove(uid);
+   return (F) this;
+   }
+
+   /**
+* Adds a new operator to the savepoint.
+* @param uid The uid of the operator.
+* @param transformation The operator to be included.
+* @return The modified savepoint.
+*/
+   @SuppressWarnings("unchecked")
+   public  F withOperator(String uid, BootstrapTransformation 
transformation) {
+   if (transformations.containsKey(uid)) {
+   throw new IllegalArgumentException("The savepoint 
already contains uid " + uid + ". All uid's must be unique");
+   }
+
+   transformations.put(uid, transformation);
+   return (F) this;
+   }
+
+   /**
+* Write out a new or updated savepoint.
+* @param path The path to where the savepoint should be written.
+*/
+   public abstract void write(String path);
+
+   protected static void write(
+   Path savepointPath,
+   Map transformations,
+   StateBackend stateBackend,
+   SavepointMetadata metadata,
+   @Nullable DataSet existingOperators) {
+
+   DataSet newOperatorStates = transformations
+   .entrySet()
+   .stream()
+   .map(entry -> getOperatorStates(savepointPath, 
entry.getKey(), stateBackend, entry.getValue(), metadata))
+   .reduce(DataSet::union)
+   .orElseThrow(() -> new 
IllegalStateException("Savepoint's must contain at least one operator"));
+
+   DataSet finalOperatorStates;
+   if (existingOperators == null) {
+   finalOperatorStates = newOperatorStates;
+   } else {
+   finalOperatorStates = 
newOperatorStates.union(existingOperators);
 
 Review comment:
   Currently no, it is just a shallow copy of pointers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git 

[GitHub] [flink] NicoK commented on issue #8873: [FLINK-12639] [docs] add "getting started" section to documentation

2019-06-27 Thread GitBox
NicoK commented on issue #8873: [FLINK-12639] [docs] add "getting started" 
section to documentation
URL: https://github.com/apache/flink/pull/8873#issuecomment-506317642
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman commented on a change in pull request #8861: [FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new savepoints

2019-06-27 Thread GitBox
sjwiesman commented on a change in pull request #8861: 
[FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new 
savepoints
URL: https://github.com/apache/flink/pull/8861#discussion_r298141103
 
 

 ##
 File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
 ##
 @@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.output.OperatorStateReducer;
+import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
+import org.apache.flink.state.api.output.SavepointOutputFormat;
+import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Any savepoint that can be written to from a batch context.
+ * @param  The implementation type.
+ */
+@PublicEvolving
+@SuppressWarnings("WeakerAccess")
+public abstract class WritableSavepoint {
+
+   protected final Map transformations;
+
+   protected final List droppedOperators;
+
+   WritableSavepoint() {
+   this.transformations  = new HashMap<>();
+   this.droppedOperators = new ArrayList<>();
+   }
+
+   /**
+* Drop an existing operator from the savepoint.
+* @param uid The uid of the operator.
+* @return A modified savepoint.
+*/
+   @SuppressWarnings("unchecked")
+   public F removeOperator(String uid) {
+   droppedOperators.add(uid);
 
 Review comment:
   Yes, that is much better than what I have. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-27 Thread GitBox
azagrebin edited a comment on issue #8646: [FLINK-12735][network] Make shuffle 
environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#issuecomment-506313023
 
 
   @StephanEwen Thanks for bringing this up.
   
   As @zhijiangW already described, true, the idea was to drop `IOManager` 
dependency in Shuffle Service API because `NettyShuffleEnviroment` basically 
uses only narrowed scope of IOManager, namely switching between temporary files.
   
   The valid concern is that if we decide to use `IOManagerAsync` with its 
threads then the threads should (or at least could) be shared by all IO 
components which use them. In that case Shuffle Service API has to accept it as 
a dependency created outside.
   
   As discussed offline, if we decide later that we need `IOManagerAsync` for 
shuffling then we can revert the change and add the abstract/interface 
`IOManager` back to `ShuffleEnvironmentContext`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-27 Thread GitBox
azagrebin edited a comment on issue #8646: [FLINK-12735][network] Make shuffle 
environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#issuecomment-506313023
 
 
   @StephanEwen Thanks for bringing this up.
   
   As @zhijiangW already described, true, the idea was to drop `IOManager` 
dependency in Shuffle Service API because `NettyShuffleEnviroment` basically 
uses only narrowed scope of IOManager, namely switching between temporary files.
   
   The valid concern is that if we decide to use `IOManagerAsync` with its 
threads then the threads should be shared by all IO components which use them. 
In that case Shuffle Service API has to accept it as a dependency created 
outside.
   
   As discussed offline, if we decide later that we need `IOManagerAsync` for 
shuffling then we can revert the change and add the abstract/interface 
`IOManager` back to `ShuffleEnvironmentContext`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-27 Thread GitBox
azagrebin commented on issue #8646: [FLINK-12735][network] Make shuffle 
environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#issuecomment-506313023
 
 
   @StephanEwen Thanks for bringing this up.
   
   True, the idea was to drop `IOManager` dependency in Shuffle Service API 
because `NettyShuffleEnviroment` basically uses only narrowed scope of 
IOManager, namely switching between temporary files. The narrowed concern of 
switching between temporary files is moved into `FileChannelManager` which is 
also used by the `IOManager`. `NettyShuffleEnviroment` can create its own 
light-weight  `FileChannelManager` with its own directory prefix.
   
   The valid concern is that if we decide to use `IOManagerAsync` with its 
threads then the threads should be shared by all IO components which use them. 
In that case Shuffle Service API has to accept it as a dependency created 
outside.
   
   As discussed offline, if we decide later that we need `IOManagerAsync` for 
shuffling then we can revert the change and add the abstract/interface 
`IOManager` back to `ShuffleEnvironmentContext`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11662) Discarded checkpoint can cause Tasks to fail

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11662:
---
Labels: pull-request-available  (was: )

> Discarded checkpoint can cause Tasks to fail
> 
>
> Key: FLINK-11662
> URL: https://issues.apache.org/jira/browse/FLINK-11662
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.0, 1.8.0
>Reporter: madong
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Attachments: jobmanager.log, taskmanager.log
>
>
> Flink's {{CheckpointCoordinator}} discards an ongoing checkpoint as soon as 
> it receives the first decline message. Part of the discard operation is the 
> deletion of the checkpointing directory. Depending on the underlying 
> {{FileSystem}} implementation, concurrent write and read operation to files 
> in the checkpoint directory can then fail (e.g. this is the case with HDFS). 
> If there is still a local checkpointing operation running for some {{Task}} 
> and belonging to the discarded checkpoint, then it can happen that the 
> checkpointing operation fails (e.g. an {{AsyncCheckpointRunnable}}). 
> Depending on the configuration of the {{CheckpointExceptionHandler}}, this 
> can lead to a task failure and a job recovery which is caused by an already 
> discarded checkpoint.
> {code:java}
> 2019-02-16 11:26:29.378 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job 
> 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>  Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was 
> not running
> at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource 
> -> mapOperate -> Timestamps/Watermarks (1/3) 
> (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1389046 for operator 
> Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for 
> operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at 
> 

[jira] [Closed] (FLINK-11662) Discarded checkpoint can cause Tasks to fail

2019-06-27 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-11662.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged in:
master: b760d55

> Discarded checkpoint can cause Tasks to fail
> 
>
> Key: FLINK-11662
> URL: https://issues.apache.org/jira/browse/FLINK-11662
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.0, 1.8.0
>Reporter: madong
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: jobmanager.log, taskmanager.log
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Flink's {{CheckpointCoordinator}} discards an ongoing checkpoint as soon as 
> it receives the first decline message. Part of the discard operation is the 
> deletion of the checkpointing directory. Depending on the underlying 
> {{FileSystem}} implementation, concurrent write and read operation to files 
> in the checkpoint directory can then fail (e.g. this is the case with HDFS). 
> If there is still a local checkpointing operation running for some {{Task}} 
> and belonging to the discarded checkpoint, then it can happen that the 
> checkpointing operation fails (e.g. an {{AsyncCheckpointRunnable}}). 
> Depending on the configuration of the {{CheckpointExceptionHandler}}, this 
> can lead to a task failure and a job recovery which is caused by an already 
> discarded checkpoint.
> {code:java}
> 2019-02-16 11:26:29.378 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job 
> 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>  Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was 
> not running
> at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource 
> -> mapOperate -> Timestamps/Watermarks (1/3) 
> (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1389046 for operator 
> Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for 
> operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at 

[GitHub] [flink] StefanRRichter merged pull request #8745: [FLINK-11662] Disable task to fail on checkpoint errors

2019-06-27 Thread GitBox
StefanRRichter merged pull request #8745: [FLINK-11662] Disable task to fail on 
checkpoint errors
URL: https://github.com/apache/flink/pull/8745
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-27 Thread GitBox
zhijiangW commented on issue #8646: [FLINK-12735][network] Make shuffle 
environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#issuecomment-506310994
 
 
   @StephanEwen thanks for sharing the background and bringing this concern!
   
   The abstract `IOManager` actually has two roles:
   - One is like the utility tool for getting file channels which is currently 
used for some streaming component, like `StateBackend, `BarrierBuffer`, 
`ShuffleService`.
   - The other one is real abstract methods for handing read/write operations 
which is mainly used for batch in sort-merge.
   
   If we assume that one component should only focus on one single function, it 
is reasonable to break down `IOManager` into two sub components as now. The 
other components could only rely on `FileChannelManager` instead of 
heavy-weight `IOManager`.
   
   Currently the shuffle service would only need internal `FileChannelManager`, 
I agree with that it might rely on the functions of write-behind or 
pre-fetching threads provided by `IOManager` future. If so, the other shuffle 
service implementations could implement a new `IOManager` instance or using 
current `IOManagerAsync` directly if needed future.
   
   Another option as you suggested is implementing a new `IOManager` instance 
without spawning threads, which also makes sense. If the shuffle service would 
need the read/writer functions future, it has to replace another `IOManager` 
implementation. But I still prefer the current way in PR, because the 
individual two sub components are both light-weight and they could be used 
together or separately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-27 Thread GitBox
zhijiangW edited a comment on issue #8646: [FLINK-12735][network] Make shuffle 
environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#issuecomment-506310994
 
 
   @StephanEwen thanks for sharing the background and bringing this concern!
   
   The abstract `IOManager` actually has two roles:
   - One is like the utility tool for getting file channels which is currently 
used for some streaming component, like `StateBackend, `BarrierBuffer`, 
`ShuffleService`.
   - The other one is real abstract methods for handing read/write operations 
which is mainly used for batch in sort-merge.
   
   If we assume that one component should only focus on one single function, it 
is reasonable to break down `IOManager` into two sub components as now. The 
other components could only rely on `FileChannelManager` instead of 
heavy-weight `IOManager`.
   
   Currently the shuffle service would only need internal `FileChannelManager`, 
I agree with that it might rely on the functions of write-behind or 
pre-fetching threads provided by `IOManager` future. If so, the other shuffle 
service implementations could implement a new `IOManager` instance or using 
current `IOManagerAsync` directly if needed future.
   
   Another option as you suggested is implementing a new `IOManager` instance 
without spawning threads, which also makes sense. If the shuffle service would 
need the read/writer functions future, it has to replace another `IOManager` 
implementation. But I still prefer the current way in PR, because the 
individual two sub components are both light-weight and they could be used 
together or separately as needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13016) StreamTaskNetworkInput can linger records for longer period of times

2019-06-27 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-13016:
--

 Summary: StreamTaskNetworkInput can linger records for longer 
period of times
 Key: FLINK-13016
 URL: https://issues.apache.org/jira/browse/FLINK-13016
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.9.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.9.0


There is a bug in {{StreamTaskNetworkInput#isAvailable}}. It ignores the case 
if some data/records are currently buffered in \{{currentRecordDeserializer}} 
field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8914: [FLINK-12929] Pass TypeInformation in addSource

2019-06-27 Thread GitBox
flinkbot commented on issue #8914: [FLINK-12929] Pass TypeInformation in 
addSource
URL: https://github.com/apache/flink/pull/8914#issuecomment-506309273
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12929) scala.StreamExecutionEnvironment.addSource does not propagate TypeInformation

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12929:
---
Labels: pull-request-available  (was: )

> scala.StreamExecutionEnvironment.addSource does not propagate TypeInformation
> -
>
> Key: FLINK-12929
> URL: https://issues.apache.org/jira/browse/FLINK-12929
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala, API / Type Serialization System
>Reporter: Fabio Lombardelli
>Priority: Critical
>  Labels: pull-request-available
>
> In {{scala.StreamExecutionEnvironment.addSource}} I would expect that 
> {{typeInfo}} is also passed to the {{javaEnv.addSource}} as second parameter 
> and not only passed to the {{returns}} method:
> {code:java}
>   def addSource[T: TypeInformation](function: SourceFunction[T]): 
> DataStream[T] = {
> require(function != null, "Function must not be null.")
> 
> val cleanFun = scalaClean(function)
> val typeInfo = implicitly[TypeInformation[T]]
> asScalaStream(javaEnv.addSource(cleanFun,  typeInfo>).returns(typeInfo))
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-12784) Support retention policy for InfluxDB metrics reporter

2019-06-27 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber resolved FLINK-12784.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Merged in master via 0004056b494c42f5aa2b2cab5876eed7cfc20875

> Support retention policy for InfluxDB metrics reporter
> --
>
> Key: FLINK-12784
> URL: https://issues.apache.org/jira/browse/FLINK-12784
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.8.0
>Reporter: Mans Singh
>Assignee: Mans Singh
>Priority: Minor
>  Labels: influxdb, metrics, pull-request-available
> Fix For: 1.9.0
>
>   Original Estimate: 12h
>  Time Spent: 20m
>  Remaining Estimate: 11h 40m
>
> InfluxDB metrics reporter uses default retention policy for saving metrics to 
> InfluxDB.  This enhancement will allow user to specify retention policy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lombarde opened a new pull request #8914: [FLINK-12929] Pass TypeInformation in addSource

2019-06-27 Thread GitBox
lombarde opened a new pull request #8914: [FLINK-12929] Pass TypeInformation in 
addSource
URL: https://github.com/apache/flink/pull/8914
 
 
   Co-authored-by: Georg Rollinger 
   
   
   
   ## What is the purpose of the change
   
   Fixes https://issues.apache.org/jira/browse/FLINK-12929:
   Changed scala.StreamExecutionEnvironment.addSource to pass the type info for 
a SourceFunction that does not extend ResultTypeQueryable.
   
   ## Brief change log
   
   see above
   
   ## Verifying this change
   
   No test added as scala/StreamExecutionEnvironment.scala seems to be not 
covered yet
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8913: [FLINK-12968][table-common] Preparation for more casting utilities

2019-06-27 Thread GitBox
flinkbot commented on issue #8913: [FLINK-12968][table-common] Preparation for 
more casting utilities
URL: https://github.com/apache/flink/pull/8913#issuecomment-506308460
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK merged pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …

2019-06-27 Thread GitBox
NicoK merged pull request #8668: [FLINK-12784][metrics] Support retention 
policy for InfluxDB metrics …
URL: https://github.com/apache/flink/pull/8668
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy edited a comment on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14

2019-06-27 Thread GitBox
ifndef-SleePy edited a comment on issue #8870: [FLINK-12974][checkstyle] Bump 
checkstyle to 8.14
URL: https://github.com/apache/flink/pull/8870#issuecomment-506306235
 
 
   @hmcl You mean downgrade to an earlier version? I don't think it's a good 
choice to use an older version. Because I believe 8.12 was well tested for 
Flink project by @zentol . The earlier version might not cover the feature of 
8.12.
   
   And the reason I propose to upgrade to 8.14 is that, the commit message of 
checkstyle plugin shows that there is no breaking change between 8.12 and 8.14. 
It mapped 8.11/8.12/8.13 to 8.14.
   
   Actually I think it's better to upgrade to the newest stable version. Of 
course it should be well tested for Flink project.
   I think we should separate this into two steps, a quick fixing to 8.14 with 
a quick testing, and a big upgrade with well testing. Because many people have 
read the guide "Adopting a Code Style and Quality Guide" written by Stephan, 
and some of them have taken a practice just like me. And they may be confused 
on the setting of checkstyle plugin. And the big upgrade is not emergency we 
can do it quite later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr opened a new pull request #8913: [FLINK-12968][table-common] Preparation for more casting utilities

2019-06-27 Thread GitBox
twalthr opened a new pull request #8913: [FLINK-12968][table-common] 
Preparation for more casting utilities
URL: https://github.com/apache/flink/pull/8913
 
 
   What is the purpose of the change
   
   Adds some utilities for more casting utilities and fixes a bug.
   
   ## Brief change log
   
   See commit messages.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-06-27 Thread GitBox
flinkbot commented on issue #8912: [FLINK-12709] [runtime] Implement new 
generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912#issuecomment-506306353
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14

2019-06-27 Thread GitBox
ifndef-SleePy commented on issue #8870: [FLINK-12974][checkstyle] Bump 
checkstyle to 8.14
URL: https://github.com/apache/flink/pull/8870#issuecomment-506306235
 
 
   @hmcl You mean downgrade to an earlier version? I don't think it's a good 
choice to use an older version. Because I believe 8.12 was well tested for 
Flink project by @zentol . The earlier version might not cover the feature of 
8.12.
   
   And the reason I propose to upgrade to 8.14 is that, the commit message of 
checkstyle plugin shows that there is no breaking change between 8.12 and 8.14. 
It mapped 8.11/8.12/8.13 to 8.14.
   
   Actually I think it's better to upgrade to the newest stable version. Of 
course it should be well tested for Flink project.
   I think we should separate this into two steps, a quick fixing to 8.14 with 
a quick testing, and a big upgrade with well testing. Because many people have 
read the guide "Adopting a Code Style and Quality Guide" written by Stephan, 
and some of them have taken a practice just like me. And they may be confused 
on the setting of checkstyle plugin.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12709) Implement RestartBackoffTimeStrategyFactoryLoader

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12709:
---
Labels: pull-request-available  (was: )

> Implement RestartBackoffTimeStrategyFactoryLoader
> -
>
> Key: FLINK-12709
> URL: https://issues.apache.org/jira/browse/FLINK-12709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.9.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> We need to implement a RestartBackoffTimeStrategyFactoryLoader to instantiate 
> RestartBackoffTimeStrategyFactory.
> In order to be backwards compatible, the loader is responsible for converting 
> *RestartStrategy* 
> configurations([https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html])and
>  *RestartStrategyConfiguration* to latest *RestartBackoffTimeStrategy* 
> configurations.
> The converted configurations will be used to create 
> *RestartBackoffTimeStrategy.Factory* via 
> *RestartBackoffTimeStrategy#createFactory(Configuration)*.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhuzhurk opened a new pull request #8912: [FLINK-12709] [runtime] Implement new generation restart strategy loader which also respects legacy…

2019-06-27 Thread GitBox
zhuzhurk opened a new pull request #8912: [FLINK-12709] [runtime] Implement new 
generation restart strategy loader which also respects legacy…
URL: https://github.com/apache/flink/pull/8912
 
 
   … restart strategy configs
   
   ## What is the purpose of the change
   
   *A loader is need to load factories of RestartBackoffTimeStrategy. In order 
to be backwards compatible, the loader should respect legacy RestartStrategy 
configurations. The legacy configuration can be in cluster config format 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/restart_strategies.html)
 or in RestartStrategies.RestartStrategyConfiguration format.*
   
   
   ## Brief change log
   
 - *The loader coverts legacy configuration into new generation format*
 - *The loader creates RestartBackoffTimeStrategy factory from new format 
configs*
   
   ## Verifying this change
   
 - *Added unit tests RestartBackoffTimeStrategyFactoryLoader*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8530: [FLINK-12611][table-planner-blink] Make time indicator nullable in blink

2019-06-27 Thread GitBox
asfgit closed pull request #8530: [FLINK-12611][table-planner-blink] Make time 
indicator nullable in blink
URL: https://github.com/apache/flink/pull/8530
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12611) Make time indicator nullable in blink

2019-06-27 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-12611.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in 1.9.0: dac9f78658efda3339977e05591989cc363f3722

> Make time indicator nullable in blink
> -
>
> Key: FLINK-12611
> URL: https://issues.apache.org/jira/browse/FLINK-12611
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> SQL: select max(rowtime), count(a) from T
> There will be a AssertionError: type mismatch:
> aggCall type:
> TIMESTAMP(3) NOT NULL
> inferred type:
> TIMESTAMP(3)
> Agg type checking is done before TimeIndicator materializes. So there is a 
> exception.
> And before introducing nullable of LogicalType, we should modify this to 
> avoid more potential TypeCheck problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13015) Create validators, strategies and transformations required for porting logical expressions

2019-06-27 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-13015:


 Summary: Create validators, strategies and transformations 
required for porting logical expressions
 Key: FLINK-13015
 URL: https://issues.apache.org/jira/browse/FLINK-13015
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.9.0


The goal of this task is to implement :

InputTypeValidator:
* by type root

TypeStrategies:
* cascade
* explicit

TypeTransformations:
* to_nullable

This set of classes will enable porting 
AND/OR/NOT/IS_TRUE/IS_FALSE/IS_NOT_TRUE/IS_NOT_FALSE to new type inference 
stack.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13014) ChainBreakTest failed on Travis

2019-06-27 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-13014:
--
Description: 
Log: [https://api.travis-ci.org/v3/job/551094138/log.txt]

 

07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 16.872 s <<< FAILURE! - in 
org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest
 07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 
1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time 
elapsed: 1.545 s <<< ERROR!
 java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
cancellation from one of its inputs
 Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
cancellation from one of its inputs
 Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
received cancellation from one of its inputs
 Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
received cancellation from one of its inputs

  was:
07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 16.872 s <<< FAILURE! - in 
org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest
07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 
1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time 
elapsed: 1.545 s <<< ERROR!
java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
cancellation from one of its inputs
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
cancellation from one of its inputs
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
received cancellation from one of its inputs
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
received cancellation from one of its inputs


> ChainBreakTest failed on Travis
> ---
>
> Key: FLINK-13014
> URL: https://issues.apache.org/jira/browse/FLINK-13014
> Project: Flink
>  Issue Type: Bug
>Reporter: Haibo Sun
>Priority: Major
>
> Log: [https://api.travis-ci.org/v3/job/551094138/log.txt]
>  
> 07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 16.872 s <<< FAILURE! - in 
> org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest
>  07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 
> 1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) 
> Time elapsed: 1.545 s <<< ERROR!
>  java.util.concurrent.ExecutionException: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
> cancellation from one of its inputs
>  Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
> cancellation from one of its inputs
>  Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
> received cancellation from one of its inputs
>  Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
> received cancellation from one of its inputs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13014) ChainBreakTest failed on Travis

2019-06-27 Thread Haibo Sun (JIRA)
Haibo Sun created FLINK-13014:
-

 Summary: ChainBreakTest failed on Travis
 Key: FLINK-13014
 URL: https://issues.apache.org/jira/browse/FLINK-13014
 Project: Flink
  Issue Type: Bug
Reporter: Haibo Sun


07:12:52.246 [ERROR] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 16.872 s <<< FAILURE! - in 
org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest
07:12:52.247 [ERROR] testMigrationAndRestore[Migrate Savepoint: 
1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainBreakTest) Time 
elapsed: 1.545 s <<< ERROR!
java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
cancellation from one of its inputs
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
cancellation from one of its inputs
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
received cancellation from one of its inputs
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task 
received cancellation from one of its inputs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StephanEwen commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-27 Thread GitBox
StephanEwen commented on issue #8880: [FLINK-12986] [network] Fix instability 
of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#issuecomment-506298639
 
 
   @zhijiangW Thanks, these are very good comments. Some answers in line.
   
   > Maybe we should not always use the FILE_MMAP type as default. It could 
give an option to config or system auto selects the proper type internally 
based on some factors. E.g. via automatically checking whether use 64 bit JRE, 
if not only small size file fitting into virtual address space could use MMAP. 
For 32 bit JRE, FILE_FILE might be a better choice.
   
   That is a fair point. We could do that.
   Just out of curiosity: Do you know how many people use 32bit JVMs still?
   
   >n Java has currently the limitation of not being able to unmap files from 
user code. Due to this bug 
("http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038;) in Sun's JRE, 
the files are unmapped when GC releases the byte buffers. We could supply the 
workaround mentioned in the bug report (see {@link #setUseUnmap}), which may 
fail on non-Oracle/OpenJDK JVMs. It forcefully unmaps the buffer on close by 
using an undocumented internal cleanup functionality. We could also check 
automatically whether this way is supported, then it might also be another 
factor to decide whether use FILE_MMAP by default.
   
   The code explicitly unmaps files using a utility from Netty, see 
https://github.com/apache/flink/pull/8880/files#diff-94d133764660d5d8372ca24c08257e25R138
   This should work, the JVM actually segfaults if you use the buffer after 
that.
   
   > Currently we use Integer.MAX_VALUE as every mmap region size. If we do not 
want to make it configurable because users might also do not know how to tune 
it, we might automatically select different size based on whether 32 or 64 bit 
JRE. Because on 32 bit platform the address space can be very fragmented, so 
large size might not be mapped. Using a lower region size makes the 
implementation a little bit slower, but the chance is slower that mmap fails. 
As I know, lucene uses the size of 1 << 30 for 64 bit, and 1 << 28 for 32 bit.
   
   If we use `FILE` mode for 32 bit JVMs, it seems that this would be solved 
already.
   
   > In current FILE_MMAP implementation, we would always mmap a region after 
writing Integer.MAX_VALUE data size. This is simple way because we do not need 
to consider the partial data margin. But if the produced partitions could not 
be consumed in time limited by resource, I am not sure whether this pre-mmap 
has other downsides. E.g. one map produces 5000 subpartitions, but only one 
reduce task could be deployed at a time limited by resource. Another option is 
lazy mmap that means establishing the mmap after the partition is requested. 
But it seems complicated for handling the region margin to avoid partial header 
or data.
   
   Do we know if the eager mapping has downsides? There is no eager 
pre-fetching, so there should be no wasted I/O. It would keep the virtual 
memory page table occupied for longer (is that an issue?) and it might 
incentivize the kernel to keep that data cached in memory longer, and evict 
other non-mapped files from the OS file cache instead (sounds like an 
advantage, could it have a downside?).
   
   Eager mapping simplifies the implementation because it avoids the need to 
synchronize the lazy mapping when creating multiple readers concurrently.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

2019-06-27 Thread Xulang Wan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873124#comment-16873124
 ] 

Xulang Wan edited comment on FLINK-11774 at 6/27/19 10:55 AM:
--

I also encountered this issue bu using a string as key. The key is randomly 
generated between [0,1000] per element. 

Seems like the error happened because the KeyGroup is out of KeyGroupRange 
which doesn't make sense to me since non of them should be changed once the 
application has run.

{color:#FF}After some investigations, i found why i met such a exception in 
my case:{color}

{color:#33}In my case, the key was set by a RNG function, and in the 
bottom, Flink will associate this function with the stream's KeyExteactor which 
will be invoked whenever a key is queried per element.{color}

{color:#33}So what actually happened is, when an element is partitioned by 
a key, it used a random number which is {color:#d04437}K1{color:#33}. 
{color}{color}{color}

{color:#33}{color:#d04437}{color:#33}Later when we do some other 
operations related to the key (in this case we set a timer by key), 
{color}{color}{color}Flink will check if the key's KeyGroup(basically a hash 
from the key) is in the range of the operator's KeyGroup,

But since KeyExteactor will be invoked again, we may get a different random 
number as a key, then the Exception is encountered since an element with this 
new key is not supposed to be handled by this Operator instance.

{color:#33}To this ticket's case, I strongly suspect that the KeyExtractor 
will also return different values for same element which caused the 
issue.{color}


was (Author: wanren192):
I also encountered this issue bu using a string as key. The key is randomly 
generated between [0,1000] per element. 

Seems like the error happened because the KeyGroup is out of KeyGroupRange 
which doesn't make sense to me since non of them should be changed once the 
application has run.

> IllegalArgumentException in HeapPriorityQueueSet
> 
>
> Key: FLINK-11774
> URL: https://issues.apache.org/jira/browse/FLINK-11774
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.7.2
> Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>Reporter: Kirill Vainer
>Priority: Major
> Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
> at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
> at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
> at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
> at 
> 

[jira] [Updated] (FLINK-12996) Add predefined type validators, strategies, and transformations

2019-06-27 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-12996:
-
Affects Version/s: 1.9.0

> Add predefined type validators, strategies, and transformations
> ---
>
> Key: FLINK-12996
> URL: https://issues.apache.org/jira/browse/FLINK-12996
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.9.0
>
>
>  FLINK-12924 introduced new interfaces for performing type inference. We need 
> a set of predefined type validators, strategies, and transformations 
> implemented with the new interfaces in order to represent the old type 
> checking logic. Those interfaces can be inspired by Calcite's 
> {{org.apache.calcite.sql.type.ReturnTypes}}, 
> {{org.apache.calcite.sql.type.OperandTypes}}, and 
> {{org.apache.calcite.sql.type.SqlTypeTransforms}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12996) Add predefined type validators, strategies, and transformations

2019-06-27 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-12996:
-
Fix Version/s: 1.9.0

> Add predefined type validators, strategies, and transformations
> ---
>
> Key: FLINK-12996
> URL: https://issues.apache.org/jira/browse/FLINK-12996
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.9.0
>
>
>  FLINK-12924 introduced new interfaces for performing type inference. We need 
> a set of predefined type validators, strategies, and transformations 
> implemented with the new interfaces in order to represent the old type 
> checking logic. Those interfaces can be inspired by Calcite's 
> {{org.apache.calcite.sql.type.ReturnTypes}}, 
> {{org.apache.calcite.sql.type.OperandTypes}}, and 
> {{org.apache.calcite.sql.type.SqlTypeTransforms}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] mans2singh commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …

2019-06-27 Thread GitBox
mans2singh commented on a change in pull request #8668: [FLINK-12784][metrics] 
Support retention policy for InfluxDB metrics …
URL: https://github.com/apache/flink/pull/8668#discussion_r298117923
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -111,7 +111,7 @@ public void testMetricReporting() throws Exception {
reporter.report();
 
verify(postRequestedFor(urlPathEqualTo("/write"))
-   .withQueryParam("db", equalTo(TEST_INFLUXDB_DB))
+   
.withQueryParam(InfluxdbReporterOptions.DB.key(), equalTo(TEST_INFLUXDB_DB))
 
 Review comment:
   @NicoK 
   
   I've changed the query param name 'db' back to it's original value.  I 
apologize for my misunderstanding.  
   
   Please let me know if there is any other recommendation.  
   
   Thanks again for your time/feedback. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12848) Method equals() in RowTypeInfo should consider fieldsNames

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873996#comment-16873996
 ] 

Jark Wu edited comment on FLINK-12848 at 6/27/19 10:40 AM:
---

Thanks [~aloyszhang] for the debugging. So let's target this issue to 1.7 and 
1.8. 

I think one way to quick fix this is not cache the RowTypeInfo in {{seenTypes}}.

What do you think [~twalthr]?


was (Author: jark):
Thanks [~aloyszhang] for the debugging. So let's target this issue to 1.7 and 
1.8. 

I think one way to quick fix this is not cache the RowTypeInfo in 
{{seenTypes}}, i,e. change [this 
line|https://github.com/apache/flink/blob/release-1.7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala#L66]
 to 

{{val relType = if (isSimple(typeInfo) || 
typeInfo.isInstanceOf[RowTypeInfo[_]]) {}}

What do you think [~twalthr]?

> Method equals() in RowTypeInfo should consider fieldsNames
> --
>
> Key: FLINK-12848
> URL: https://issues.apache.org/jira/browse/FLINK-12848
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.7.2
>Reporter: aloyszhang
>Assignee: aloyszhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Since the `RowTypeInfo#equals()` does not consider the fieldNames , when 
> process data with RowTypeInfo type there may comes an error of the field 
> name.  
> {code:java}
> String [] fields = new String []{"first", "second"};
> TypeInformation[] types = new TypeInformation[]{
> Types.ROW_NAMED(new String[]{"first001"}, Types.INT),
> Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; 
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment env = 
> StreamTableEnvironment.getTableEnvironment(execEnv);
> SimpleProcessionTimeSource streamTableSource = new 
> SimpleProcessionTimeSource(fields, types);
> env.registerTableSource("testSource", streamTableSource);
> Table sourceTable = env.scan("testSource");
> System.out.println("Source table schema : ");
> sourceTable.printSchema();
> {code}
> The table shcema will be 
> {code:java}
> Source table schema : 
> root 
> |-- first: Row(first001: Integer) 
> |-- second: Row(first001: Integer) 
> |-- timestamp: TimeIndicatorTypeInfo(proctime)
> {code}
> the second field has the same name with the first field.
> So, we should consider the fieldnames in RowTypeInfo#equals()
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-27 Thread GitBox
StephanEwen commented on a change in pull request #8880: [FLINK-12986] 
[network] Fix instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298114249
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of {@link BoundedData} that writes directly into a File 
Channel.
+ * The readers are simple file channel readers using a simple dedicated buffer 
pool.
+ */
+final class FileChannelBoundedData implements BoundedData {
+
+   private final Path filePath;
+
+   private final FileChannel fileChannel;
+
+   private final ByteBuffer[] headerAndBufferArray;
+
+   private long size;
+
+   private final int memorySegmentSize;
+
+   FileChannelBoundedData(
+   Path filePath,
+   FileChannel fileChannel,
+   int memorySegmentSize) {
+
+   this.filePath = checkNotNull(filePath);
+   this.fileChannel = checkNotNull(fileChannel);
+   this.memorySegmentSize = memorySegmentSize;
+   this.headerAndBufferArray = 
BufferReaderWriterUtil.allocatedWriteBufferArray();
+   }
+
+   @Override
+   public void writeBuffer(Buffer buffer) throws IOException {
+   size += BufferReaderWriterUtil.writeToByteChannel(fileChannel, 
buffer, headerAndBufferArray);
+   }
+
+   @Override
+   public void finishWrite() throws IOException {
+   fileChannel.close();
+   }
+
+   @Override
+   public Reader createReader() throws IOException {
+   checkState(!fileChannel.isOpen());
+
+   final FileChannel fc = FileChannel.open(filePath, 
StandardOpenOption.READ);
+   return new FileBufferReader(fc, memorySegmentSize);
+   }
+
+   @Override
+   public long getSize() {
+   return size;
+   }
+
+   @Override
+   public void close() throws IOException {
+   IOUtils.closeQuietly(fileChannel);
+   Files.delete(filePath);
+   }
+
+   // 

+
+   public static FileChannelBoundedData create(Path filePath, int 
memorySegmentSize) throws IOException {
+   final FileChannel fileChannel = FileChannel.open(
+   filePath, StandardOpenOption.CREATE_NEW, 
StandardOpenOption.WRITE);
+
+   return new FileChannelBoundedData(
+   filePath,
+   fileChannel,
+   memorySegmentSize);
+   }
+
+   // 

+
+   static final class FileBufferReader implements BoundedData.Reader, 
BufferRecycler {
+
+   private static final int NUM_BUFFERS = 2;
+
+   private final FileChannel fileChannel;
+
+   private final ByteBuffer headerBuffer;
+
+   private final ArrayDeque buffers;
+
+   FileBufferReader(FileChannel fileChannel, int bufferSize) {
+   this.fileChannel = checkNotNull(fileChannel);
+   this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+  

[jira] [Commented] (FLINK-12848) Method equals() in RowTypeInfo should consider fieldsNames

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873996#comment-16873996
 ] 

Jark Wu commented on FLINK-12848:
-

Thanks [~aloyszhang] for the debugging. So let's target this issue to 1.7 and 
1.8. 

I think one way to quick fix this is not cache the RowTypeInfo in 
{{seenTypes}}, i,e. change [this 
line|https://github.com/apache/flink/blob/release-1.7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala#L66]
 to 

{{val relType = if (isSimple(typeInfo) || 
typeInfo.isInstanceOf[RowTypeInfo[_]]) {}}

What do you think [~twalthr]?

> Method equals() in RowTypeInfo should consider fieldsNames
> --
>
> Key: FLINK-12848
> URL: https://issues.apache.org/jira/browse/FLINK-12848
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.7.2
>Reporter: aloyszhang
>Assignee: aloyszhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Since the `RowTypeInfo#equals()` does not consider the fieldNames , when 
> process data with RowTypeInfo type there may comes an error of the field 
> name.  
> {code:java}
> String [] fields = new String []{"first", "second"};
> TypeInformation[] types = new TypeInformation[]{
> Types.ROW_NAMED(new String[]{"first001"}, Types.INT),
> Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; 
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment env = 
> StreamTableEnvironment.getTableEnvironment(execEnv);
> SimpleProcessionTimeSource streamTableSource = new 
> SimpleProcessionTimeSource(fields, types);
> env.registerTableSource("testSource", streamTableSource);
> Table sourceTable = env.scan("testSource");
> System.out.println("Source table schema : ");
> sourceTable.printSchema();
> {code}
> The table shcema will be 
> {code:java}
> Source table schema : 
> root 
> |-- first: Row(first001: Integer) 
> |-- second: Row(first001: Integer) 
> |-- timestamp: TimeIndicatorTypeInfo(proctime)
> {code}
> the second field has the same name with the first field.
> So, we should consider the fieldnames in RowTypeInfo#equals()
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-27 Thread GitBox
StephanEwen commented on a change in pull request #8880: [FLINK-12986] 
[network] Fix instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298113422
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 ##
 @@ -157,8 +159,9 @@ private static void initializeBoundedBlockingPartitions(
int i = 0;
try {
for (; i < subpartitions.length; i++) {
-   subpartitions[i] = new 
BoundedBlockingSubpartition(
-   i, parent, 
ioManager.createChannel().getPathFile().toPath());
+   final File spillFile = 
ioManager.createChannel().getPathFile();
+   subpartitions[i] =
+   
BoundedBlockingSubpartition.createWithFileAndMemoryMappedReader(i, parent, 
spillFile);
 
 Review comment:
   I am not confident enough in any other implementation, yet.
   I only added them for our testing and benchmarking.
   
   I am also always unsure about adding too many configurable parts for users. 
For API / behavior, we need customizability to fit the application and setup 
needs.
   
   For runtime and performance, I tend to favor one opinionated way that works 
well across the board. If there is another option that is 10% faster in 10% of 
the cases, I am not sure users benefit too much from that, but the added 
configuration makes it overall harder to understand and predict.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-27 Thread GitBox
StephanEwen commented on a change in pull request #8880: [FLINK-12986] 
[network] Fix instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298112193
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+final class FileChannelMemoryMappedBoundedData implements BoundedData {
+
+   /** The file channel backing the memory mapped file. */
+   private final FileChannel fileChannel;
+
+   /** The reusable array with header buffer and data buffer, to use 
gathering writes on the
+* file channel ({@link 
java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */
+   private final ByteBuffer[] headerAndBufferArray;
+
+   /** All memory mapped regions. */
+   private final ArrayList memoryMappedRegions;
+
+   /** The path of the memory mapped file. */
+   private final Path filePath;
+
+   /** The position in the file channel. Cached for efficiency, because an 
actual position
+* lookup in the channel involves various locks and checks. */
+   private long pos;
+
+   /** The position where the current memory mapped region must end. */
+   private long endOfCurrentRegion;
+
+   /** The position where the current memory mapped started. */
+   private long startOfCurrentRegion;
+
+   /** The maximum size of each mapped region. */
+   private final long maxRegionSize;
+
+   FileChannelMemoryMappedBoundedData(
 
 Review comment:
   True, could be private at the moment.
   My idea was to have it accessible for tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-27 Thread GitBox
StephanEwen commented on a change in pull request #8880: [FLINK-12986] 
[network] Fix instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298111597
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
+/**
+ * Putting and getting of a sequence of buffers to/from a FileChannel or a 
ByteBuffer.
+ * This class handles the headers, length encoding, memory slicing.
+ *
+ * The encoding is the same across FileChannel and ByteBuffer, so this 
class can
+ * write to a file and read from the byte buffer that results from mapping 
this file to memory.
+ */
+final class BufferReaderWriterUtil {
+
+   static final int HEADER_LENGTH = 8;
+
+   static final int HEADER_VALUE_IS_BUFFER = 0;
+
+   static final int HEADER_VALUE_IS_EVENT = 1;
+
+   // 

+   //  ByteBuffer read / write
+   // 

+
+   static boolean writeBuffer(Buffer buffer, ByteBuffer memory) {
+   final int bufferSize = buffer.getSize();
+
+   if (memory.remaining() < bufferSize + HEADER_LENGTH) {
+   return false;
+   }
+
+   memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : 
HEADER_VALUE_IS_EVENT);
+   memory.putInt(bufferSize);
+   memory.put(buffer.getNioBufferReadable());
+   return true;
+   }
+
+   @Nullable
+   static Buffer sliceNextBuffer(ByteBuffer memory) {
+   final int remaining = memory.remaining();
+
+   // we only check the correct case where data is exhausted
+   // all other cases can only occur if our write logic is wrong 
and will already throw
+   // buffer underflow exceptions which will cause the read to 
fail.
+   if (remaining == 0) {
+   return null;
+   }
+
+   final int header = memory.getInt();
+   final int size = memory.getInt();
+
+   memory.limit(memory.position() + size);
+   ByteBuffer buf = memory.slice();
+   memory.position(memory.limit());
+   memory.limit(memory.capacity());
+
+   MemorySegment memorySegment = 
MemorySegmentFactory.wrapOffHeapMemory(buf);
+
+   return bufferFromMemorySegment(
+   memorySegment,
+   FreeingBufferRecycler.INSTANCE,
+   size,
+   header == HEADER_VALUE_IS_EVENT);
+   }
+
+   // 

+   //  ByteChannel read / write
+   // 

+
+   static long writeToByteChannel(
+   FileChannel channel,
+   Buffer buffer,
+   ByteBuffer[] arrayWithHeaderBuffer) throws IOException {
+
+   final ByteBuffer headerBuffer = arrayWithHeaderBuffer[0];
+   headerBuffer.clear();
+   headerBuffer.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER 
: HEADER_VALUE_IS_EVENT);
+   headerBuffer.putInt(buffer.getSize());
+   

[jira] [Commented] (FLINK-13004) Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState

2019-06-27 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873994#comment-16873994
 ] 

Jark Wu commented on FLINK-13004:
-

[~yunta] (y)

> Correct the logic of needToCleanupState in 
> KeyedProcessFunctionWithCleanupState
> ---
>
> Key: FLINK-13004
> URL: https://issues.apache.org/jira/browse/FLINK-13004
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Current implementation of needToCleanupState in 
> KeyedProcessFunctionWithCleanupState actually has potention bug:
> {code:java}
> protected Boolean needToCleanupState(Long timestamp) throws IOException {
>if (stateCleaningEnabled) {
>   Long cleanupTime = cleanupTimeState.value();
>   // check that the triggered timer is the last registered processing 
> time timer.
>   return null != cleanupTime && timestamp == cleanupTime;
>} else {
>   return false;
>}
> }
> {code}
> Please note that it directly use "==" to judge whether *Long* type timestamp 
> and cleanupTime equals. However, if that value is larger than 127L, the 
> result would actually return false instead of wanted true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12536) Make BufferOrEventSequence#getNext() non-blocking

2019-06-27 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873993#comment-16873993
 ] 

Stephan Ewen commented on FLINK-12536:
--

Can you guys share why you believe this is important to change?

  - As far as I understand, this affects only the non-credit-based model, which 
is supported but meant to be used only in rare cases and to be removed at some 
point.

  - This adds quite a bit of extra complexity, like adding dependencies to the 
I/O manager in the alignment stack

  - In any healthy setup, we have extremely few buffers to read after 
alignment, which should not be so performance critical that it would warrant a 
bigger addition in code complexity.

  - The new implementation means that I/O is now queues behind other 
potentially large I/O ops (like from hash table reading or sort spill reading). 
That means we can introduce delays in those I/O and stall the streaming 
pipeline.

  - While the mailbox thread should generally not block, I think that these few 
occasional I/O ops of a legacy mode are fine. They happen only once in a while 
(after checkpoint) in a legacy mode. Do we ever expect these ops to block for 
significant enough time? If yes, would we expect that everything else is still 
low-latency in that case when the I/O ops block long, or could we actually 
tolerate this?

tl:dr - this change seems a lot like premature optimization to me. We introduce 
complexity for an assumed problem where I am skeptical that it is a problem in 
practice. I am happy to be convinced of the opposite, but checking whether this 
is PMO would be good before merging this.


> Make BufferOrEventSequence#getNext() non-blocking
> -
>
> Key: FLINK-12536
> URL: https://issues.apache.org/jira/browse/FLINK-12536
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Piotr Nowojski
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently it is non-blocking in case of credit-based flow control (default), 
> however for \{{SpilledBufferOrEventSequence}} it is blocking on reading from 
> file. We might want to consider reimplementing it to be non blocking with 
> {{CompletableFuture isAvailable()}} method.
>  
> Otherwise we will block mailbox processing for the duration of reading from 
> file - for example we will block processing time timers and potentially in 
> the future network flushes.
>  
> This is not a high priority change, since it affects non-default 
> configuration option AND at the moment only processing time timers are 
> planned to be moved to the mailbox for 1.9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on a change in pull request #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState

2019-06-27 Thread GitBox
wuchong commented on a change in pull request #8909: 
[FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in 
KeyedProcessFunctionWithCleanupState
URL: https://github.com/apache/flink/pull/8909#discussion_r298108636
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
 ##
 @@ -157,7 +157,7 @@ public void onTimer(
if (needToCleanupState(timestamp)) {
 
// we check whether there are still records 
which have not been processed yet
-   boolean noRecordsToProcess = 
!inputState.contains(timestamp);
+   boolean noRecordsToProcess = 
!inputState.keys().iterator().hasNext();
 
 Review comment:
   good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-27 Thread GitBox
zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#issuecomment-506284860
 
 
   @bowenli86 Sorry a little late to update this pr, please review again when 
you have time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats related operations in HiveCatalog

2019-06-27 Thread GitBox
zjuwangg commented on issue #8636: [FLINK-12237][hive]Support Hive table stats 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8636#issuecomment-506284600
 
 
   rebase and force to push 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on a change in pull request #8886: [FLINK-12987][metrics] fix DescriptiveStatisticsHistogram#getCount() not returning the number of elements seen

2019-06-27 Thread GitBox
StephanEwen commented on a change in pull request #8886: [FLINK-12987][metrics] 
fix DescriptiveStatisticsHistogram#getCount() not returning the number of 
elements seen
URL: https://github.com/apache/flink/pull/8886#discussion_r298101578
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
 ##
 @@ -22,25 +22,30 @@
 
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics 
{@link DescriptiveStatistics} as a Flink {@link Histogram}.
  */
 public class DescriptiveStatisticsHistogram implements 
org.apache.flink.metrics.Histogram {
 
private final DescriptiveStatistics descriptiveStatistics;
 
+   private final AtomicLong elementsSeen = new AtomicLong();
 
 Review comment:
   Is this class assumed to be use in a multi-threaded context?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13013) Make sure that SingleInputGate can always request partitions

2019-06-27 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-13013:
---
Description: Currently {{SingleInputGate}} requests partitions only on the 
first attempt to fetch the data. Before requesting partitions, no data 
availability notifications can get through. This doesn't work well with a new 
non blocking {{InputGate}} interface, since on newly created 
{{SingleInputGates,}} {{InputGate#isAvailable()}} might return not available, 
and it will be only able to switch to available, after first call to 
{{SingleInputGate#pollNext()}}. However this might never happen, since caller 
could be waiting indefinitely on {{SingleInputGate#isAvailable()}}.  (was: 
Currently {{SingleInputGate}} requests partitions only on the first attempt to 
fetch the data. Before requesting partitions, no data availability 
notifications can get through. This doesn't work well with a new non blocking 
{{InputGate}} interface, since on newly created {{SingleInputGates,}} 
{{InputGate#isAvailable()}} might return not available, and it will be only 
able to switch to available, after first call to 
{{SingleInputGate#pollNext()}}.)

> Make sure that SingleInputGate can always request partitions
> 
>
> Key: FLINK-13013
> URL: https://issues.apache.org/jira/browse/FLINK-13013
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> Currently {{SingleInputGate}} requests partitions only on the first attempt 
> to fetch the data. Before requesting partitions, no data availability 
> notifications can get through. This doesn't work well with a new non blocking 
> {{InputGate}} interface, since on newly created {{SingleInputGates,}} 
> {{InputGate#isAvailable()}} might return not available, and it will be only 
> able to switch to available, after first call to 
> {{SingleInputGate#pollNext()}}. However this might never happen, since caller 
> could be waiting indefinitely on {{SingleInputGate#isAvailable()}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] aljoscha commented on issue #8371: [FLINK-9679] - Add AvroSerializationSchema

2019-06-27 Thread GitBox
aljoscha commented on issue #8371: [FLINK-9679] - Add AvroSerializationSchema
URL: https://github.com/apache/flink/pull/8371#issuecomment-506282454
 
 
   @Wosin could you please rebase and squash the various commits into some 
commits that make sense, i.e. separate pre-requisites like version upgrade into 
a separate commit and then have the actual implementation commit? I'd like to 
review this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8877: [FLINK-12984][metrics] only call Histogram#getStatistics() once where possible

2019-06-27 Thread GitBox
StephanEwen commented on issue #8877: [FLINK-12984][metrics] only call 
Histogram#getStatistics() once where possible
URL: https://github.com/apache/flink/pull/8877#issuecomment-506281800
 
 
   Looks good in general.
   
   What is a bit strange is that the dropwizard metrics now have dependencies 
on flink-runtime.
   Is that because `DescriptiveStatisticsHistogram` is in the wrong place 
(`flink-runtime`)? Should that implementation be in the metrics projects 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13013) Make sure that SingleInputGate can always request partitions

2019-06-27 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-13013:
--

 Summary: Make sure that SingleInputGate can always request 
partitions
 Key: FLINK-13013
 URL: https://issues.apache.org/jira/browse/FLINK-13013
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski


Currently {{SingleInputGate}} requests partitions only on the first attempt to 
fetch the data. Before requesting partitions, no data availability 
notifications can get through. This doesn't work well with a new non blocking 
{{InputGate}} interface, since on newly created {{SingleInputGates,}} 
{{InputGate#isAvailable()}} might return not available, and it will be only 
able to switch to available, after first call to {{SingleInputGate#pollNext()}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13013) Make sure that SingleInputGate can always request partitions

2019-06-27 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873978#comment-16873978
 ] 

Piotr Nowojski commented on FLINK-13013:


As a hotfix we are attempting to always wake up 
{{SingleInputGate#isAvailable()}} upon creation/registration of an 
{{InputChannel}}, so that {{requestPartitions()}} can happen.

For a long term solution, life cycle of the partitions/input channels/input 
gate might need to be changed.

> Make sure that SingleInputGate can always request partitions
> 
>
> Key: FLINK-13013
> URL: https://issues.apache.org/jira/browse/FLINK-13013
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> Currently {{SingleInputGate}} requests partitions only on the first attempt 
> to fetch the data. Before requesting partitions, no data availability 
> notifications can get through. This doesn't work well with a new non blocking 
> {{InputGate}} interface, since on newly created {{SingleInputGates,}} 
> {{InputGate#isAvailable()}} might return not available, and it will be only 
> able to switch to available, after first call to 
> {{SingleInputGate#pollNext()}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StephanEwen commented on issue #8646: [FLINK-12735][network] Make shuffle environment implementation independent with IOManager

2019-06-27 Thread GitBox
StephanEwen commented on issue #8646: [FLINK-12735][network] Make shuffle 
environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#issuecomment-506278154
 
 
   Sorry for jumping in late.
   
   Initially, we added this dependency to make it possible for spilling 
partitions to use write-behind and separate pre-fetching threads. Do we want to 
drop this completely?
   
   Also, when it comes to reducing dependencies in the network shuffle 
environment, what is the gain by replacing IOManager with FileChannelManager?
   If this is mainly about the threads that the IOManagerAsync spawns, another 
option could be to has an IOManagerSync without threads.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] knaufk commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-27 Thread GitBox
knaufk commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r298093759
 
 

 ##
 File path: 
flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 ##
 @@ -92,9 +105,23 @@ public StandaloneJobClusterConfiguration 
createResult(@Nonnull CommandLine comma
restPort,
savepointRestoreSettings,
jobId,
+   jobIdSeed,
jobClassName);
}
 
+   private String getJobIdSeed(@Nonnull final CommandLine commandLine) 
throws FlinkParseException {
+
+   boolean isJobIdSeedConfigured = 
commandLine.hasOption(JOB_ID_SEED_OPTION.getOpt());
+   boolean isJobIdConfigured = 
commandLine.hasOption(JOB_ID_OPTION.getOpt());
+
+   if (isJobIdSeedConfigured && isJobIdConfigured) {
 
 Review comment:
   I would rather change the README so that when you specify the `--job-id` 
explicitly, you have to remove the `--job-id-seed` parameter. 
   
   The templates are an opinionated starting point to run Job/Application 
Clusters on Kubernetes, as such they should contain the `--job-id-seed`.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] NicoK commented on a change in pull request #8668: [FLINK-12784][metrics] Support retention policy for InfluxDB metrics …

2019-06-27 Thread GitBox
NicoK commented on a change in pull request #8668: [FLINK-12784][metrics] 
Support retention policy for InfluxDB metrics …
URL: https://github.com/apache/flink/pull/8668#discussion_r298092960
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -111,7 +111,7 @@ public void testMetricReporting() throws Exception {
reporter.report();
 
verify(postRequestedFor(urlPathEqualTo("/write"))
-   .withQueryParam("db", equalTo(TEST_INFLUXDB_DB))
+   
.withQueryParam(InfluxdbReporterOptions.DB.key(), equalTo(TEST_INFLUXDB_DB))
 
 Review comment:
   Please revert this change here, since the query param is actually unrelated 
to how we name our options. In my last review, I was only referring to the 
`createMetricRegistry` method below.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] knaufk commented on a change in pull request #8741: [FLINK-12752] Add Option to Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint

2019-06-27 Thread GitBox
knaufk commented on a change in pull request #8741: [FLINK-12752] Add Option to 
Pass Seed for JobID Hash for StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/8741#discussion_r298093759
 
 

 ##
 File path: 
flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 ##
 @@ -92,9 +105,23 @@ public StandaloneJobClusterConfiguration 
createResult(@Nonnull CommandLine comma
restPort,
savepointRestoreSettings,
jobId,
+   jobIdSeed,
jobClassName);
}
 
+   private String getJobIdSeed(@Nonnull final CommandLine commandLine) 
throws FlinkParseException {
+
+   boolean isJobIdSeedConfigured = 
commandLine.hasOption(JOB_ID_SEED_OPTION.getOpt());
+   boolean isJobIdConfigured = 
commandLine.hasOption(JOB_ID_OPTION.getOpt());
+
+   if (isJobIdSeedConfigured && isJobIdConfigured) {
 
 Review comment:
   I would rather change the README so that you specify the `--job-id` 
explicitly, you have to remove the `--job-id-seed` parameter. 
   
   The templates are an opinionated starting point to run Job/Application 
Clusters on Kubernetes, as such they should contain the `--job-id-seed`.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-27 Thread GitBox
zhijiangW commented on a change in pull request #8880: [FLINK-12986] [network] 
Fix instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298093109
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
 ##
 @@ -220,28 +225,29 @@ private static int alignSize(int maxRegionSize) {
 * The "reader" for the memory region. It slices a sequence of buffers 
from the
 * sequence of mapped ByteBuffers.
 */
-   static final class BufferSlicer {
+   static final class BufferSlicer implements BoundedData.Reader {
 
-   /** The reader/decoder to the memory mapped region with the 
data we currently read from.
+   /** The memory mapped region we currently read from.
 * Max 2GB large. Further regions may be in the {@link 
#furtherData} field. */
-   private BufferToByteBuffer.Reader data;
+   private ByteBuffer data;
 
 Review comment:
   it might be better to name `currentData` comparing with `furtherData`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create

2019-06-27 Thread GitBox
flinkbot edited a comment on issue #8779: [FLINK-12882][network] Remove 
ExecutionAttemptID argument from ResultPartitionFactory#create
URL: https://github.com/apache/flink/pull/8779#issuecomment-503009589
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @azagrebin
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @azagrebin
   * ❗ 3. Needs [attention] from.
   - Needs attention by @azagrebin
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @azagrebin
   * ✅ 5. Overall code [quality] is good.
   - Approved by @azagrebin
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8779: [FLINK-12882][network] Remove ExecutionAttemptID argument from ResultPartitionFactory#create

2019-06-27 Thread GitBox
azagrebin commented on issue #8779: [FLINK-12882][network] Remove 
ExecutionAttemptID argument from ResultPartitionFactory#create
URL: https://github.com/apache/flink/pull/8779#issuecomment-506270426
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-27 Thread GitBox
dianfu commented on issue #8910: [hotfix][python] Aligns with Java Table API by 
removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910#issuecomment-506268938
 
 
   @WeiZhong94 Thanks a lot for the review. Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] WeiZhong94 commented on a change in pull request #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-27 Thread GitBox
WeiZhong94 commented on a change in pull request #8910: [hotfix][python] Aligns 
with Java Table API by removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910#discussion_r298081292
 
 

 ##
 File path: flink-python/pyflink/shell.py
 ##
 @@ -84,7 +84,7 @@
 * os.remove(sink_path)
 * else:
 * shutil.rmtree(sink_path)
-* bt_env.exec_env().set_parallelism(1)
+* b_env.set_parallelism(1)
 
 Review comment:
   I think we can add "b_env" at the beginning of the example to tell the users 
what variable is predefined. "Use the 'bt_env' variable" -> "Use the 'b_env', 
'bt_env' variable"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Wosin commented on issue #8371: [FLINK-9679] - Add AvroSerializationSchema

2019-06-27 Thread GitBox
Wosin commented on issue #8371: [FLINK-9679] - Add AvroSerializationSchema
URL: https://github.com/apache/flink/pull/8371#issuecomment-506265879
 
 
   Hey, 
   Any update on this ??


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
flinkbot commented on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to 
Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506262493
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12995) Add Hive-1.2.1 build to Travis

2019-06-27 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12995:
---
Labels: pull-request-available  (was: )

> Add Hive-1.2.1 build to Travis
> --
>
> Key: FLINK-12995
> URL: https://issues.apache.org/jira/browse/FLINK-12995
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Rui Li
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lirui-apache opened a new pull request #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-06-27 Thread GitBox
lirui-apache opened a new pull request #8911: [FLINK-12995][hive] Add 
Hive-1.2.1 build to Travis
URL: https://github.com/apache/flink/pull/8911
 
 
   
   
   ## What is the purpose of the change
   
   To add Hive-1.2.1 build to Travis.
   
   
   ## Brief change log
   
 - Added a Travis stage to build and test against Hive-1.2.1
   
   
   ## Verifying this change
   
   Existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? NA
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12751) Create file based HA support

2019-06-27 Thread Boris Lublinsky (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873929#comment-16873929
 ] 

Boris Lublinsky commented on FLINK-12751:
-

Although the kubelet can die, if the node is alive, it will be immediately 
restarted, as it is a service. The racing condition is possible, but not 
probable.

If you want to ensure uniqueness of the job manager, we can add a lock file 
with job manager IP.  See 
[https://events.linuxfoundation.org/wp-content/uploads/2017/11/Leader-Election-Machanism-in-Distributed-System-Using-a-Globally-Accessible-Filesystem-OSS-Dharmendra-Kushwaha.pdf]
 for example

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2   3   4   >