[jira] [Created] (FLINK-24852) Cleanup of Orphaned Incremental State Artifacts

2021-11-09 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-24852:


 Summary: Cleanup of Orphaned Incremental State Artifacts
 Key: FLINK-24852
 URL: https://issues.apache.org/jira/browse/FLINK-24852
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.14.0
Reporter: Stephan Ewen


Shared State Artifacts (state files in the "shared" folder in the DFS / 
ObjectStore) can become orphaned in various situations:

* When a TaskManager fails right after it created a state file but before the 
checkpoint was ack-ed to the JobManager, that state file will be orphaned.
* When the JobManager fails all state newly added for the currently pending 
checkpoint will be orphaned.

These state artifacts are currently impossible to be cleaned up manually, 
because it isn't easily possible to understand whether they are still being 
used (referenced by any checkpoint).

We should introduce a "garbage collector" that identifies and deletes such 
orphaned state artifacts.

h2. Idea for a cleanup mechanism

A periodic cleanup thread would periodically execute a cleanup procedure that 
searches for and deletes the orphaned artifacts.
To identify those artifacts, the cleanup procedure needs the following inputs:

* The oldest retained checkpoint ID
* A snapshot of the shared state registry
* A way to identify for each state artifact from which checkpoint it was 
created.

The cleanup procedure would
* enumerate all state artifacts (for example files in the "shared" directory)
* For each one check whether it was created earlier than the oldest retained 
checkpoint. If not, that artifact would be skipped, because it might come from 
a later pending checkpoint, or later canceled checkpoint.
* Finally, the procedure checks if the state artifact is known by the shared 
state registry. If yes, the artifact is kept, if not, it is orphaned and will 
be deleted.

Because the cleanup procedure is specific to the checkpoint storage, it should 
probably be instantiated from the checkpoint storage.

To make it possible to identify the checkpoint for which a state artifact was 
created, we can put that checkpoint ID into the state file name, for example 
format the state name as {{"_"}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24366) Unnecessary/misleading error message about failing restores when tasks are already canceled.

2021-09-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-24366:


 Summary: Unnecessary/misleading error message about failing 
restores when tasks are already canceled.
 Key: FLINK-24366
 URL: https://issues.apache.org/jira/browse/FLINK-24366
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.13.2, 1.14.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.13.3, 1.15.0, 1.14.1


The following line is logged in all cases where the restore operation fails. 
The check whether the task is canceled comes only after that line.

The fix would be to move the log line to after the check.

{code}
Exception while restoring my-stateful-task from alternative (1/1), will retry 
while more alternatives are available.
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24343) Revisit Scheduler and Coordinator Startup Procedure

2021-09-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-24343:


 Summary: Revisit Scheduler and Coordinator Startup Procedure
 Key: FLINK-24343
 URL: https://issues.apache.org/jira/browse/FLINK-24343
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.2, 1.14.0
Reporter: Stephan Ewen
 Fix For: 1.15.0


We need to re-examine the startup procedure of the scheduler, and how it 
interacts with the startup of the operator coordinators.

We need to make sure the following conditions are met:
  - The Operator Coordinators are started before the first action happens that 
they need to be informed of. That includes as task being ready, a checkpoint 
happening, etc.

  - The scheduler must be started to the point that it can handle 
"failGlobal()" calls, because the coordinators might trigger that during their 
startup when an exception in "start()" occurs.

/cc [~chesnay]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24255) Test Environment / Mini Cluster do not forward configuration.

2021-09-10 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-24255:


 Summary: Test Environment / Mini Cluster do not forward 
configuration.
 Key: FLINK-24255
 URL: https://issues.apache.org/jira/browse/FLINK-24255
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


When using {{StreamExecutionEnvironment 
getExecutionEnvironment(Configuration)}}, the config should determine the 
characteristics of the execution.

The config is for example passed to the local environment in the local 
execution case, and used during the instantiation of the MiniCluster.

But when using the {{TestStreamEnvironment}} and the 
{{MiniClusterWithClientRule}}, the config is ignored.

The issue is that the {{StreamExecutionEnvironmentFactory}} in 
{{TestStreamEnvironment}} ignores the config that is passed to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23843) Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure instead of Process Kill

2021-08-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23843:


 Summary: Exceptions during 
"SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure 
instead of Process Kill
 Key: FLINK-23843
 URL: https://issues.apache.org/jira/browse/FLINK-23843
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


Currently, when a the method "SplitEnumeratorContext.runInCoordinatorThread()" 
throws an exception, the effect is a process kill of the JobManager process.

The chain how the process kill happens is:
* An exception bubbling up in the executor, killing the executor thread
* The executor starts a replacement thread, which is forbidden by the thread 
factory (as a safety net) and causes a process kill.

We should prevent such exceptions from bubbling up in the coordinator executor.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23842) Add log messages for reader registrations and split requests.

2021-08-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23842:


 Summary: Add log messages for reader registrations and split 
requests.
 Key: FLINK-23842
 URL: https://issues.apache.org/jira/browse/FLINK-23842
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


Currently, there are is nothing logged when source enumerators get reader 
registration events, or when they receive split requests.

While some specific source implementations log this in their implementation, 
for the general case, this information is missing, even though it is super 
valuable when debugging and understanding the work assignment behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23649) Add RocksDB packages to parent-first classloading patterns.

2021-08-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23649:


 Summary: Add RocksDB packages to parent-first classloading 
patterns.
 Key: FLINK-23649
 URL: https://issues.apache.org/jira/browse/FLINK-23649
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.13.2
Reporter: Stephan Ewen
 Fix For: 1.14.0


RocksDB classes are currently loaded child-first.

Because of that, it can happen that the RocksDB library is attempted to be 
loaded multiple times (by different classloaders).

That is prevented by JNI and results in an error as reported in this mail for 
example
https://lists.apache.org/x/thread.html/rbc3ca24efe13b25e802af9739a6877276503363ffbdc5914ffdad7be@%3Cuser.flink.apache.org%3E

We should prevent accidental repeated loading of RocksDB, because we rely on 
the fact that only one DB is created per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23630) Make EventTimeWindowCheckpointingITCase and LocalRecoveryITCase run on Windows.

2021-08-04 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23630:


 Summary: Make EventTimeWindowCheckpointingITCase and 
LocalRecoveryITCase run on Windows.
 Key: FLINK-23630
 URL: https://issues.apache.org/jira/browse/FLINK-23630
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


This need a fix in the test where it creates the paths for the checkpoint 
storage locations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23629) Remove redundant test cases in EventTimeWindowCheckpointingITCase

2021-08-04 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23629:


 Summary: Remove redundant test cases in 
EventTimeWindowCheckpointingITCase
 Key: FLINK-23629
 URL: https://issues.apache.org/jira/browse/FLINK-23629
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


HashMap state store snapshots are always async right now, sync snapshots are no 
longer supported.

We should adjust the {{EventTimeWindowCheckpointingITCase}} to remove the now 
redundant cases {{MEM_ASYNC}} and {{FILE_ASYNC}} parameter runs.

The test is very time-intensive, so this is quite a time saver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23301) StateFun HTTP Ingress

2021-07-07 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23301:


 Summary: StateFun HTTP Ingress
 Key: FLINK-23301
 URL: https://issues.apache.org/jira/browse/FLINK-23301
 Project: Flink
  Issue Type: Sub-task
  Components: Stateful Functions
Reporter: Stephan Ewen


The HTTP ingress would start an HTTP Server at a specified port.

The HTTP server would only handle _POST_ requests. The target function is 
represented by the path to which the request is made, the message contents is 
the body of the POST request.

The following example would send an empty message to the function with the 
address \{{namespace='example', type='greeter', id='Igal'}}.

{code}
curl -X POST http://statefun-ingress:/in/example/greeter/Igal

POST /in/example/greeter/Igal HTTP/1.1
Content-Length: 0
{code}

The example below would send empty message of type 'statefun/string' to the 
function with the address \{{namespace='example', type='greeter', id='Elisa'}} 
and the message contents\{{"{numTimesToGreet: 5}"}}.

curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d 
"\{numTimesToGreet: 5}" http://statefun-ingress:/in/example/greeter/Elisa

POST /in/example/greeter/Elisa HTTP/1.1
Content-Type: text/plain; charset=UTF-8
Content-Length: 20

{numTimesToGreet: 5}
{code}


h3. Data Types

The content type (mime type) specified in the request header of the HTTP 
request will be directly mapped to the statefun types.
For example, a \{{Content-Type: io.statefun.tyes/int}} will set the type of the 
message to \{{io.statefun.tyes/int}}.

As a special case, we map the content type \{{text/plain}} to 
\{{io.statefun.tyes/string}}, to make simple cases and examples work more 
seamlessly.

The following examples would send a message to a function that expectes a 
ProtoBuf encoded type \{{Greeting}} registerd in StateFun as 
\{{example/greeting}}.

{code}
> curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d 
> "\{numTimesToGreet: 5}"

> CONTENTS=`echo 'numTimesToGreet: 5' | ./protoc --encode Greeting 
> example_types.proto`
> echo $CONTENTS | curl -X POST -H "Content-Type: example/greeting" 
> --data-binary @- http://statefun-ingress:/in/example/greeter/Bobby
{code}


h3. Sender and Responses

We want to support round-trip-style interactions, meaning posting a request and 
receiving a response.
Given the async messaging nature of StateFun, this might not be necessarily in 
one HTTP request which immediately gives you the corresponsing response. 
Instead, it can be in issuing (POST) a request to the HTTP ingress and polling 
(GET) a response from an associated HTTP Egress.

To support these kind of patterns, the HTTP ingress will assign a random 
request correlation ID in the HTTP response.
Furthermore, the ingress will optionally set the \{{sender()}} field of the 
created message to reference a configured associated egress.

The ingress config woud add an entry referencing the egress (like 
\{{'paired_egress_name: httpout'}}).

{code}
> curl -X POST -i http://statefun-ingress:/in/example/greeter/Igal
POST /in/example/greeter/Igal HTTP/1.1
Content-Length: 0

HTTP/1.1 200 OK
StateFun-Request-Correlation-ID: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5
Content-Length: 0
{code}

The created message would have no body, but would have the \{{sender() = 
{{egress/httpout/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5}}.

_Note: We would need to extend the message address scheme to be able to 
reference egresses.
The egress itself can grab the correlation ID from the ID part of the address, 
because the HTTP egres doesn't use that field (in fact, no egress currently 
interprets the ID)._

h3. Singleton Instantiation

To avoid port conflicts, we need to do a singleton instantiation per JVM.
This can be achieved by using a statically referenced context to hold the 
instantiated servier and a reference to the

In the future, we can look into extending this to avoid setup/teardown when 
operators are cancelled for recovery.
The server would then live as long as the StateFun application (job) lives (or 
more precisely, as long as the slot lives, which is the duration that the 
TaskManager is associated with the StateFun deployment - typically the entire 
lifetime).

To achieve that, we would tear down the server in a [shutdown hook of the 
user-code 
classloader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L145).
 Instead of letting the first source set up the server, the first source would 
register its output as the stream for the server to push messages to.


h3. Configuration parameters

- Bind host (default 0.0.0.0)
 - Bind port (default )
 - Path (default "in") (for the path in the URL 
\{{http(s)://:}})

- Egress pair name, for setting the egress that replies should go to.

- To setup SSL for the connection, we add similar settings as for Flink's REST 

[jira] [Created] (FLINK-23281) StateFun - Simplify Getting Stared Experience with HTTP Ingresses and Egresses

2021-07-06 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23281:


 Summary: StateFun - Simplify Getting Stared Experience with HTTP 
Ingresses and Egresses
 Key: FLINK-23281
 URL: https://issues.apache.org/jira/browse/FLINK-23281
 Project: Flink
  Issue Type: New Feature
  Components: Stateful Functions
Reporter: Stephan Ewen


To make it easier to get started with StateFun, we want to reduce the 
dependencies on other systems and tools that are currently required to get your 
first program running.
_(For reference, you currently need a docker-compose setup with at least Flink, 
Kafka, ZooKeeper, and then you need to interact with it using Kafka command 
line tools (or other clients) to publish messages to the ingress topic.)_

This issue aims to add simple pre-packaged HTTP ingresses/egresses that can be 
used for examples and exploration, and can be used with standard tools (like 
\{{curl}}). That reduces the barrier to exploration.
_(Citing @ssc here: you have roughlyone lunchbreak of time to get a developer 
excited. Many devs just play around for about 45 minutes, and when they don't 
see some preliminary success with simple examples, they drop the exploration.)_

An example interaction could be:
{code}
> curl -X POST -i http://:/in/example/greeter/Igal
HTTP/2 200
request-id: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5

> curl -X GET 
> http://:/out/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5
Hello for the 1337th time...
{code}

*Note:* The HTTP Ingress/Egress here are different from the HTTP state access 
from FLINK-23261.
State requests against the state access API (FLINK-23261) only interacts with 
state entries and never invoke functions. In contrast, messages against the 
here-proposed Ingress/Egress send messages to functions like any other ingress.

This is the umbrella issue. Dedicated tasks for ingress/egress and request 
correlation are in the subtasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23261) StateFun - HTTP State Access Interface

2021-07-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23261:


 Summary: StateFun - HTTP State Access Interface 
 Key: FLINK-23261
 URL: https://issues.apache.org/jira/browse/FLINK-23261
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Stephan Ewen


h2. Functionality

To improve operations of StateFun applications, we should offer an interface to 
query and manipulate state entries.
 This can be used for exploration and debugging purposes, and to repair state 
if the application state needs to be corrected.

To make this simple, I would suggest an HTTP REST interface:
 - GET: read state entry for key
 - PUT: set/overwrite state for key
 - DELETE: drop state entry for key

The URLs could be: 
{{http(s)://statefun-service/state}},
 where the string {{//}} is the fully-qualified address 
of the target, and the {{statename}} is the name under which that persistent 
state is stored.

Keys are always UTF-8 strings in StateFun, so they can be encoded in the URL.

For the responses, we would use the common codes 200 for GET/PUT/DELETE success 
and 404 for GET/DELETE not found.
 The state values, as returned from GET requests, would be generally just the 
bytes, and not interpreted by this request handling.

The integrate of the StateFun type system and HTTP content types (mime types) 
is up for further discussion.
 One option is set the content type response header to 
{{"statefun/"}}, where all non-simple types map to 
{{Content-Type: application/octet-stream}}. We may make an exception for 
strings which could be returned as {{Content-Type: text/plain; charset=UTF-8}}.
 Later refinement is possible, like auto-stringifying contents when the request 
indicates to only accept {{text/plain}} responses.
h2. Failure Guarantees

The initial failure guarantees for PUT/DELETE would be none - the requests 
would be handled best effort.

We can easily extend this later in one of two ways:
 - Delay responses to the HTTP requests until the next checkpoint is complete. 
That makes synchronous interaction easy and sounds like a good match for a more 
admin-style interface.
 - Return the current checkpoint ID and offer a way to poll until the next 
checkpoint is completed. This avoid blocking requests, but puts more burden on 
the caller.
 Given the expected nature of the use cases for PUT/DELETE are more of a 
"admin/fix" nature, I would suggest to go with synchronous requests, for 
simplicity.

h2. Implementation

There are two options to implement this:
 (1) A Source/Sink (Ingress/Egress) pair
 (2) An Operator Coordinator with HTTP Requests

*Option (1) - Source/Sink pair*

We would implement an specific source that is both an ingress and an egress.
 The source would spawn a HTTP server (singleton per TM process).

Requests would be handled as follows:
 - Am HTTP request gets a generated correlation-ID.
 - The source injects a new message type (a "control message") into the stream. 
That message holds the Correlation-ID, the parallel subtask index of the 
originating source, and the target address and state name.
 - The function dispatcher handled these message in a special way, retrieving 
the state and sending an Egress message with the Correlation-ID to the parallel 
subtask of the egress as indicated by the message's subtask index.
 - The Egress (which is the same instance as the ingress source) uses to 
correlation ID to respond to the request.

Advantages:
 - No changes necessary in Flink
 - Might sustain higher throughput, due to multiple HTTP endpoints

Disadvantages:
 - Additional HTTP servers and ports require more setup (like service 
definitions on K8s).
 - Need to introduce new control message type and extend function dispatcher to 
handle them.
 - Makes a hard assumption that sources run on all slots. Needs "ugly" 
singleton hack to start only one server per TM process.

*Option (2) - Operator Coordinator*

Operator Coordinators are instances that run on the {{JobManager}} and can 
communicate with the Tasks via RPC.

Coordinators can receive calls from HTTP handlers at the JobManager's HTTP 
endpoint.
 An example for this is the Result Fetching through HTTP/OperatorCoordinator 
requests.
 We would need a patch to Flink to allow registering custom URLs and passing 
the path as a parameter to the request.

The RPCs can be processed in the mailbox on the Tasks, making them thread safe.
 This would also completely avoid the round-trip (source-to-sink) problem, the 
tasks simply need to send a response back to the RPC.

Advantages:
 - Reuse existing HTTP Endpoint and port. No need to have an additional HTTP 
server and port and service, for this admin-style requests, this approach 
re-uses Flink's admin HTTP endpoint.
 - No need for singleton HTTP Server logic in Tasks
 - Does require the assumption that all TMs run an instance of all operators.
 - No need for "control messages" and 

[jira] [Created] (FLINK-23093) Limit number of I/O pool and Future threads in Mini Cluster

2021-06-22 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-23093:


 Summary: Limit number of I/O pool and Future threads in Mini 
Cluster
 Key: FLINK-23093
 URL: https://issues.apache.org/jira/browse/FLINK-23093
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Stephan Ewen
 Fix For: 1.14.0


When running tests on CI via the minicluster, the mini cluster typically spawns 
100s of I/O threads, both in the MiniCluster I/O pool and in the TM I/O pool.

The standard rule for the maximum pool size is 4*num-cores, but the number of 
cores can be fairly large these days. Various Java versions also mess up core 
counting when running in containers (JVM container might have been given 2 
cores as resource limits, but the JVM counts the system as a whole, like 64/128 
cores).

This is both a nuisance for debugging, and a big waste of memory (each thread 
takes by default around 1MB when spawned, so the test JVM wastes 100s of MBs 
for nothing).

I would suggest to set a default of 8 I/O threads for the Mini Cluster. The 
scaling-with-cores is important for proper TM/JM deployments, but not for the 
Mini Cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22753) Activate log-based Checkpoints for StateFun

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22753:


 Summary: Activate log-based Checkpoints for StateFun
 Key: FLINK-22753
 URL: https://issues.apache.org/jira/browse/FLINK-22753
 Project: Flink
  Issue Type: Sub-task
  Components: Stateful Functions
Reporter: Stephan Ewen


Once available, we should use log-based checkpointing in StateFun, to have 
predictable checkpointing times and predictably low end-to-end exactly-once 
latencies.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22752) Add robust default state configuration to StateFun

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22752:


 Summary: Add robust default state configuration to StateFun
 Key: FLINK-22752
 URL: https://issues.apache.org/jira/browse/FLINK-22752
 Project: Flink
  Issue Type: Sub-task
Reporter: Stephan Ewen


We aim to reduce the state configuration complexity by applying a default 
configuration with robust settings, based on lessons learned in Flink.

*(1) Always use RocksDB.*

_That is already the case._

We keep this for now, as long as the only other alternative are backends with 
Objects on the heap, which are tricky in terms of predictable JVM performance. 
RocksDB has a significant performance cost, but more robust behavior.

*(2) Activate local recovery by default.*

That makes recovery cheao for soft tasks failures and gracefully cancelled 
tasks.
We need to set these options:
  - {{state.backend.local-recovery: true}}
  - {{taskmanager.state.local.root-dirs: }} - some local directory that 
will not possibly be wiped by the OS periodically, so typically some local 
directory that is not {{/tmp}}, for example {{/local/state/recovery}}.
  - {{state.backend.rocksdb.localdir: }} - a directory on the same FS / 
device as above, so that one can create hard links between them (required for 
RocksDB local checkpoints), for example {{/local/state/rocksdb}}.

Flink will most likely adopt this as a default setting as well in the future.
It still makes sense to pre-configer a different RocksDB working directory than 
{{/tmp}}.

*(3) Activate partitioned indexes by default.*

This may cost minimal performance in some cases, but can avoid massive 
performance regression in cases where the index blocks no longer fit into the 
memory cache (may happen more frequently when there are too many ColumnFamilies 
= states).

Set {{state.backend.rocksdb.memory.partitioned-index-filters: true}}.

See FLINK-20496 for details.

*(4) Increase number of transfer threads by default.*

This speeds up state recovery in many cases. The default value in Flink is a 
bit conservative, to avoid spamming DFS (like HDFS) by default. The more 
cloud-centric StateFun setups should be safe to use higher default value.

Set {{state.backend.rocksdb.checkpoint.transfer.thread.num: 8}}.

*(5) Increase RocksDB compaction threads by default.*

The number of RocksDB compaction threads is frequently a bottleneck.
Increasing it costs virtually nothing and mitigates that bottleneck in most 
cases.

{{state.backend.rocksdb.thread.num: 4}} (this value is chosen under the 
assumption that there is only one slot).




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22751) Reduce JVM Metaspace memory for StateFun

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22751:


 Summary: Reduce JVM Metaspace memory for StateFun
 Key: FLINK-22751
 URL: https://issues.apache.org/jira/browse/FLINK-22751
 Project: Flink
  Issue Type: Sub-task
  Components: Stateful Functions
Reporter: Stephan Ewen


Flink by default reserves quite a lot of metaspace memory (256 MB) out of the 
total memory budget, to accommodate applications that load (and reload) a lot 
of classes. That is necessary because user code is executed directly in the JVM.

StateFun by default doesn't execute code in the JVM, so it needs much less 
Metaspace memory. I would suggest to reduce the Metaspace size to something 
like 64MB or 96MB by default.

{{taskmanager.memory.jvm-metaspace.size: 64m}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22749) Apply a robust default State Backend Configuration

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22749:


 Summary: Apply a robust default State Backend Configuration
 Key: FLINK-22749
 URL: https://issues.apache.org/jira/browse/FLINK-22749
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Stephan Ewen


We should update the default state backend configuration with default settings 
that reflect lessons-learned about robust setups.

(1) Always use the RocksDB State Backend. That is already the case.

(2) Active Partitioned Index filters by default. This may cost some overhead in 
specific cases, but helps with massive performance regressions when we have too 
many ColumnFamilies (too many states) such that the cache can no longer hold 
all index files.

We need to add {{state.backend.rocksdb.memory.partitioned-index-filters: true}} 
to the config.

See FLINK-20496 for details.
There is a chance that Flink makes this the default in the future as well, then 
we could remove it again from the StateFun setup.

(3) Activate local recovery by default.

That should speed up the recovery of all non-hard-crashed TMs by a lot.
We need to configure
  - {{state.backend.local-recovery: true}}
  - {{taskmanager.state.local.root-dirs}} to some non-temp directory

For this to work reliably, we need a local directory that is not periodically 
wiped by the OS, so we should not rely on the default ({{/tmp}} directory, but 
set up a dedicated non-temp state directory.

Flink will probably make this the default in the future, but having a 
non-{{/tmp}} directory for the RocksDB and local snapshots makes still a lot of 
sense.

(4) Increase state transfer threads by default, to speed up state restores.

Add to the config: {{state.backend.rocksdb.checkpoint.transfer.thread.num: 8}}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22741) Hide Flink complexity from Stateful Functions

2021-05-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22741:


 Summary: Hide Flink complexity from Stateful Functions
 Key: FLINK-22741
 URL: https://issues.apache.org/jira/browse/FLINK-22741
 Project: Flink
  Issue Type: New Feature
  Components: Stateful Functions
Affects Versions: statefun-3.0.0
Reporter: Stephan Ewen


This is an umbrella issue for various issues to hide and reduce the complexity 
and surface area (and configuration space) of Apache Flink when using Stateful 
Functions.

The goal of this is to create a setup and configuration that works robustly in 
the vast majority of settings. Users should not be required to configure 
anything Flink-specific, other than 

If this happens at the cost of some minor regression in peak stream throughput, 
we can most likely stomach that in StateFun, because the performance cost is 
commonly dominated by the interaction between StateFun cluster and remote 
function deployments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22729) Truncated Messages in Python workers

2021-05-20 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22729:


 Summary: Truncated Messages in Python workers
 Key: FLINK-22729
 URL: https://issues.apache.org/jira/browse/FLINK-22729
 Project: Flink
  Issue Type: Bug
  Components: Stateful Functions
Affects Versions: statefun-2.2.2
 Environment: The Stateful Function version is 2.2.2, java8. The Java 
App as well as
the external Python workers are deployed in the same kubernetes cluster.
Reporter: Stephan Ewen
 Fix For: statefun-3.1.0


Recently we started seeing the following faulty behavior in the Flink
Stateful Functions HTTP communication towards external Python workers.
This is only occurring when the system is under heavy load.

The Java Application will send HTTP Messages to an external Python
Function but the external Function fails to parse the message with a
"Truncated Message Error". Printouts show that the truncated message
looks as follows:

{code}


my.protobuf.MyClass: 

my.protobuf.MyClass: 

my.protobuf.MyClass: 

my.protobuf.MyClass: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefun-Truncated-Messages-in-Python-workers-td43831.html





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22433) CoordinatorEventsExactlyOnceITCase stalls on Adaptive Scheduler

2021-04-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22433:


 Summary: CoordinatorEventsExactlyOnceITCase stalls on Adaptive 
Scheduler
 Key: FLINK-22433
 URL: https://issues.apache.org/jira/browse/FLINK-22433
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.13.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0, 1.13.1


Logs of the test failure:

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17077=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7030a106-e977-5851-a05e-535de648c9c9=3

Steps to reproduce: Adjust the {{CoordinatorEventsExactlyOnceITCase }} and 
extend the MiniCluster configuration:
{code}
@BeforeClass
public static void startMiniCluster() throws Exception {
final Configuration config = new Configuration();
config.setString(RestOptions.BIND_PORT, "0");
config.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
config.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22406) ReactiveModeITCase.testScaleDownOnTaskManagerLoss()

2021-04-22 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22406:


 Summary: ReactiveModeITCase.testScaleDownOnTaskManagerLoss()
 Key: FLINK-22406
 URL: https://issues.apache.org/jira/browse/FLINK-22406
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.13.0
Reporter: Stephan Ewen


The test is stalling on Azure CI.

https://dev.azure.com/sewen0794/Flink/_build/results?buildId=292=logs=0a15d512-44ac-5ba5-97ab-13a5d066c22c=634cd701-c189-5dff-24cb-606ed884db87=4865



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22358) Add missing stability annotation to Split Reader API classes

2021-04-19 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22358:


 Summary: Add missing stability annotation to Split Reader API 
classes
 Key: FLINK-22358
 URL: https://issues.apache.org/jira/browse/FLINK-22358
 Project: Flink
  Issue Type: Task
  Components: Connectors / Common
Reporter: Stephan Ewen
 Fix For: 1.14.0, 1.13.1


The Split Reader API currently has no stability annotations, it is unclear 
which classes are public API, which are internal, which are stable, and which 
are evolving.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22357) Mark FLIP-27 Source API as stable

2021-04-19 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22357:


 Summary: Mark FLIP-27 Source API as stable
 Key: FLINK-22357
 URL: https://issues.apache.org/jira/browse/FLINK-22357
 Project: Flink
  Issue Type: Task
  Components: API / Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.14.0


The FLIP-27 source API was properly introduced in 1.11, has undergone some 
major improvements in 1.12.

During the stabilization in 1.13 we needed only one very minor change to those 
interfaces.

I think it is time to declare the core source API interfaces as stable, to 
allow users to safely rely on them. I would suggest to do that for 1.14, 
possibly even backport the annotation change to 1.13.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22324) Backport FLINK-18071 for 1.12.x

2021-04-16 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22324:


 Summary: Backport FLINK-18071 for 1.12.x
 Key: FLINK-22324
 URL: https://issues.apache.org/jira/browse/FLINK-22324
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.3


See  FLINK-18071 - this issue only tracks the backport to allow closing the 
blocker issue for 1.13.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22093) Unstable test "ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling"

2021-04-01 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22093:


 Summary: Unstable test 
"ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling"
 Key: FLINK-22093
 URL: https://issues.apache.org/jira/browse/FLINK-22093
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Stephan Ewen
 Fix For: 1.13.0


The test 
{{ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling()}}
 failed in all profiles in my latest CI build (even though it passes locally).

  - 
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267=ab910030-93db-52a7-74a3-34a0addb481b=8102

  - 
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=6e55a443-5252-5db5-c632-109baf464772=9df6efca-61d0-513a-97ad-edb76d85786a=6698

  - 
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=cc649950-03e9-5fae-8326-2f1ad744b536=51cab6ca-669f-5dc0-221d-1e4f7dc4fc85



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22069) Check Log Pollution for 1.13 release

2021-03-31 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-22069:


 Summary: Check Log Pollution for 1.13 release
 Key: FLINK-22069
 URL: https://issues.apache.org/jira/browse/FLINK-22069
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Stephan Ewen
 Fix For: 1.13.0


We should check for log pollution and confusing log lines before the release.
Below are some lines I stumbled over while using Flink during testing.

-

These lines show up on any execution of a local job and make me think I forgot 
to configure something I probably should have, wondering whether this might 
cause problems later?

These have been in Flink for a few releases now, might be worth rephrasing, 
though.

{code}
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.cpu.cores required for local execution is not 
set, setting it to the maximal possible value.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.task.heap.size required for local 
execution is not set, setting it to the maximal possible value.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.task.off-heap.size required for local 
execution is not set, setting it to the maximal possible value.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.network.min required for local 
execution is not set, setting it to its default value 64 mb.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.network.max required for local 
execution is not set, setting it to its default value 64 mb.
2021-03-30 17:57:22,483 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The 
configuration option taskmanager.memory.managed.size required for local 
execution is not set, setting it to its default value 128 mb.
{code}

-

These lines show up on every job start, even if there is no recovery but just a 
plain job start. They are not particularly problematic, but also not helping.

{code}
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,839 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
2021-03-30 17:57:27,855 INFO  
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - 
Converting recovered input channels (8 channels)
{code}



When using {{DataStream.collect()}} we always have an excpetion in the log for 
the first fetch attempt, before the JM is ready.
The loop retries and the program succeeds, but the exception in the log raises 
confusion about whether there is a swallowed but impactful error.

{code}
7199 [main] WARN  
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
exception occurs when fetching query results
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: 
Unable to get JobMasterGateway for initializing job. The requested operation is 
not available while the JobManager is initializing.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:155)
 ~[classes/:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
 [classes/:?]
at 

[jira] [Created] (FLINK-21996) Transient RPC failure without TaskManager failure can lead to split assignment loss

2021-03-26 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-21996:


 Summary: Transient RPC failure without TaskManager failure can 
lead to split assignment loss
 Key: FLINK-21996
 URL: https://issues.apache.org/jira/browse/FLINK-21996
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.12.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.13.0


NOTE: This bug has not been actually observed. It is based on reviews of the 
current implementation.
I would expect it to be a pretty rare case, bu at scale, even the rare cases 
happen often enough.

h2. Problem

Intermediate RPC messages from JM to TM can get dropped, even when the TM is 
not marked as failed.
That can happen when the connection can be recovered before the heartbeat times 
out.

So RPCs generally retry, or handle failures: For example Deploy-Task-RPC 
retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers a 
new checkpoint.

The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a 
Future with the acknowledgement. But if that one fails, we are in the situation 
where we do not know whether the event sending was successful or not (only the 
ack failed).

This is especially tricky for split assignments and checkpoints. Consider this 
sequence of actions:
  1. Coordinator assigns a split. Ack not yet received.
  2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so 
is not included on the Coordinator.
  3. Split assignment RPC response is "failed".
  4. Checkpoint completes.

Now we don't know whether the split was in the checkpoint on the Operator 
(TaskManager) or not, and with that we don't know whether we should add it back 
to the coordinator. We need to do something to make sure the split is now 
either on the coordinator or on the Operator. Currently, the split is 
implicitly assumed to be on the Operator; if it isn't, then that split is lost.

Not, it is worth pointing out that this is a pretty rare situation, because it 
means that the RPC with the split assignment fails and the one for the 
checkpoint succeeds, even though they are in close proximity. The way the 
Akka-based RPC transport works (with retries, etc.), this can happen, but isn't 
very likely. That why we haven't so far seen this bug in practice or haven't 
gotten a report for it, yet.


h2. Proposed solution

The solution has two components:

  1. Fallback to consistent point: If the system doesn't know whether two parts 
are still consistent with each other (here coordinator and Operator), fall back 
to a consistent point. Here that is the case when the Ack-Future for the "Send 
Operator Event" RPC fails or times out. Then we call the scheduler to trigger a 
failover of the target operator to latest checkpoint and signaling the 
coordinator the same. That restores consistency. We can later optimize this 
(see below).

  2. We cannot trigger checkpoints while we are "in limbo" concerning our 
knowledge about splits. Concretely that means that the Coordinator can only 
acknowledge the checkpoint once the Acks for pending Operator Event RPCs 
(Assign-Splits) have arrived. The checkpoint future is conditional on all 
pending RPC futures. If the RPC futures fail (or time out) then the checkpoint 
cannot complete (and the target operator will anyways go through a failover). 
In the common case, RPC round trip time is milliseconds, which would be added 
to the checkpoint latency if the checkpoint happends to overlap with a split 
assignment (most won't).


h2. Possible Future Improvements

Step (1) above can be optimized by going with retries first and sequence 
numbers to deduplicate the calls. That can help reduce the number of cases were 
a failover is needed. However, the number of situations where the RPC would 
need a retry and has a chance of succeeding (the TM is not down) should be very 
few to begin with, so whether this optimization is worth it remains to be seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21935) Remove "state.backend.async" option.

2021-03-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-21935:


 Summary: Remove "state.backend.async" option.
 Key: FLINK-21935
 URL: https://issues.apache.org/jira/browse/FLINK-21935
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stephan Ewen
 Fix For: 1.13.0


Checkpoints are always asynchronous, there is no case ever for a synchronous 
checkpoint.
The RocksDB state backend doesn't even support synchronous snapshots, and the 
HashMap Heap backend also has no good use case for synchronous snapshots (other 
than a very minor reduction in heap objects).

Most importantly, we should not expose this option in the constructors of the 
new state backend API classes, like {{HashMapStateBackend}}. 

I marked this a blocker because it is part of the new user-facing State Backend 
API and I would like to avoid that this option enters this API and causes 
confusion when we eventually remove it.

/cc [~sjwiesman] and [~liyu]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21695) Increase default value for number of KeyGroups

2021-03-09 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-21695:


 Summary: Increase default value for number of KeyGroups
 Key: FLINK-21695
 URL: https://issues.apache.org/jira/browse/FLINK-21695
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Stephan Ewen
 Fix For: 1.13.0


The current calculation for the number of Key Groups (max parallelism) leads in 
many cases to data skew and to confusion among users.

Specifically, the fact that for maxParallelisms above 128, the default value is 
set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, half of 
the tasks get one keygroup and the other half gets two keygroups, which is very 
skewed.

See section (1) in this "lessons learned" blog post. 
https://engineering.contentsquare.com/2021/ten-flink-gotchas/

We can fix this by
  - either setting a default maxParallelism to something pretty high (2048 for 
example). The cost is that we add the default key group overhead per state 
entry from one byte to two bytes.
  - or we stay with some similar logic, but we instead of {{1.5 x 
operatorParallelism}} we go with some higher multiplier, like {{4 x 
operatorParallelism}}. The price is again that we more quickly reach the point 
where we have two bytes of keygroup encoding overhead, instead of one.

Implementation wise, there is an unfortunate situation that the maxParallelism, 
if not configured, is not stored anywhere in the job graph, but re-derived on 
the JobManager each time it loads a JobGraph vertex (ExecutionJobVertex) which 
does not have a MaxParallelism configured. This relies on the implicit contract 
that this logic never changes.
Changing this logic will instantly break all jobs which have not explicitly 
configured the Max Parallelism. That seems like a pretty heavy design 
shortcoming, unfortunately :-(

A way to partially work around that is by moving the logic that derives the 
maximum parallelism to the {{StreamGraphGenerator}}, so we never create 
JobGraphs where vertices have no configured Max Parallelism (and we keep the 
re-derivation logic for backwards compatibility for persisted JobGraphs).
The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to 
give existing un-configured applications a way to keep restoring from old 
savepoints. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21694) Increase default value of "state.backend.rocksdb.checkpoint.transfer.thread.num"

2021-03-09 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-21694:


 Summary: Increase default value of 
"state.backend.rocksdb.checkpoint.transfer.thread.num"
 Key: FLINK-21694
 URL: https://issues.apache.org/jira/browse/FLINK-21694
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Stephan Ewen
 Fix For: 1.13.0


The default value for the number of threads used to download state artifacts 
from checkpoint storage should be increased.

The increase should not pose risk of regression, but does in many cases speed 
up checkpoint recovery significantly.

Something similar was reported in this blog post, item (3).
https://engineering.contentsquare.com/2021/ten-flink-gotchas/

A default value of 8 (eight) sounds like a good default. It should not result 
in excessive thread explosion, and already speeds up recovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20472) Change quickstarts to have a dependency on "flink-dist"

2020-12-03 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20472:


 Summary: Change quickstarts to have a dependency on "flink-dist"
 Key: FLINK-20472
 URL: https://issues.apache.org/jira/browse/FLINK-20472
 Project: Flink
  Issue Type: Improvement
  Components: Quickstarts
Reporter: Stephan Ewen


I suggest we change the quickstarts to have the following dependencies.
  - {{flink-dist}} (provided)
  - log4j (runtime)

That way the projects created form quickstarts have exactly the same 
dependencies as what the distribution provides. That solves all our mismatches 
between needing different dependencies for compiling/running-in-IDE and 
packaging for deployment.

For example, we can add the {{flink-connector-base}} and 
{{flink-connector-files}} back to {{flink-dist}} without having any issues with 
setting up dependencies in the user projects.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20413) Sources should add splits back in "resetSubtask()", rather than in "subtaskFailed()".

2020-11-29 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20413:


 Summary: Sources should add splits back in "resetSubtask()", 
rather than in "subtaskFailed()".
 Key: FLINK-20413
 URL: https://issues.apache.org/jira/browse/FLINK-20413
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Because "subtaskFailed()" has no strong order guarantees with checkpoint 
completion, we need to return failed splits in "resetSubtask()" instead.

See FLINK-20396 for a detailed explanation of the race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20412) Collect Result Fetching occasionally fails after a JobManager Failover

2020-11-29 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20412:


 Summary: Collect Result Fetching occasionally fails after a 
JobManager Failover
 Key: FLINK-20412
 URL: https://issues.apache.org/jira/browse/FLINK-20412
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Stephan Ewen


The encountered exception is as blow.

 

The issue can be reproduced by running a test with JobManager failover in a 
tight loop, for example the FileTextLinesITCase from this PR: 
[https://github.com/apache/flink/pull/14199]

 
{code:java}
15335 [main] WARN  
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher - An 
exception occurs when fetching query results
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163)
 ~[classes/:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:134)
 [classes/:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103)
 [classes/:?]
at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
 [classes/:?]
at 
org.apache.flink.streaming.api.datastream.DataStreamUtils.collectRecordsFromUnboundedStream(DataStreamUtils.java:142)
 [classes/:?]
at 
org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSource(FileSourceTextLinesITCase.java:272)
 [test-classes/:?]
at 
org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSourceWithJobManagerFailover(FileSourceTextLinesITCase.java:228)
 [test-classes/:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 [junit-4.12.jar:4.12]
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 [junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
[junit-4.12.jar:4.12]
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) 
[junit-4.12.jar:4.12]
at org.junit.rules.RunRules.evaluate(RunRules.java:20) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
[junit-4.12.jar:4.12]
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 [junit-4.12.jar:4.12]
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
[junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
[junit-4.12.jar:4.12]
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
[junit-4.12.jar:4.12]
at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) 
[junit-4.12.jar:4.12]
at org.junit.rules.RunRules.evaluate(RunRules.java:20) 
[junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
[junit-4.12.jar:4.12]
at org.junit.runner.JUnitCore.run(JUnitCore.java:137) 
[junit-4.12.jar:4.12]
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
 [junit-rt.jar:?]
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:67)
 [junit-rt.jar:?]
at 

[jira] [Created] (FLINK-20406) Return the Checkpoint ID of the restored Checkpoint in CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks()

2020-11-27 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20406:


 Summary: Return the Checkpoint ID of the restored Checkpoint in 
CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks()
 Key: FLINK-20406
 URL: https://issues.apache.org/jira/browse/FLINK-20406
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


To allow the scheduler to notify Operator Coordinators of subtask restores 
(local failover), we need to know which checkpoint ID was restored. 

This change does not adjust the other restore methods of the Checkpoint 
Coordinator, because the fact that the Scheduler needs to be involved in the 
subtask restore notification at all is only due to a shortcoming of the 
Checkpoint Coordinator: The CC is not aware of subtask restores, it always 
restores all subtasks and relies on the fact that assigning state to a running 
execution attempt has no effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20397) Pass checkpointId to OperatorCoordinator.resetToCheckpoint().

2020-11-27 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20397:


 Summary: Pass checkpointId to 
OperatorCoordinator.resetToCheckpoint().
 Key: FLINK-20397
 URL: https://issues.apache.org/jira/browse/FLINK-20397
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.11.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0, 1.11.3


The OperatorCoordinator.resetToCheckpoint() currently lacks the information 
which checkpoint it recovers to.

That forces implementers to assume strict ordering of method calls between 
restore and failure. While that is currently guaranteed in this case, it is not 
guaranteed in other places (see parent issue).

Because of that, we want implementations to not assume method order at all, but 
rely on explicit information passed to the methods (checkpoint IDs). Otherwise 
we end up with mixed implementations that partially infer context from the 
order of method calls, and partially use explicit information that was passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20396) Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()"

2020-11-27 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20396:


 Summary: Replace "OperatorCoordinator.subtaskFailed()" with 
"subtaskRestored()"
 Key: FLINK-20396
 URL: https://issues.apache.org/jira/browse/FLINK-20396
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.11.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0, 1.11.3


There are no strong order guarantees between 
{{OperatorCoordinator.subtaskFailed()}} and 
{{OperatorCoordinator.notifyCheckpointComplete()}}.

It can happen that a checkpoint completes after the notification for task 
failure is sent:
  - {{OperatorCoordinator.checkpoint()}}
  - {{OperatorCoordinator.subtaskFailed()}}
  - {{OperatorCoordinator.checkpointComplete()}}

The subtask failure here does not know whether the previous checkpoint 
completed or not. It cannot decide what state the subtask will be in after 
recovery.
There is no easy fix right now to strictly guarantee the order of the method 
calls, so alternatively we need to provide the necessary information to reason 
about the status of tasks.

We should replace {{OperatorCoordinator.subtaskFailed(int subtask)}} with 
{{OperatorCoordinator.subtaskRestored(int subtask, long checkpoint)}}. That 
implementations get the explicit checkpoint ID for the subtask recovery, and 
can align that with the IDs of checkpoints that were taken.

It is still (in rare cases) possible that for a specific checkpoint C, 
{{OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before 
{{OperatorCoordinator.checkpointComplete(C)}}.


h3. Background

The Checkpointing Procedure is partially asynchronous on the {{JobManager}} / 
{{CheckpointCoordinator}}: After all subtasks acknowledged the checkpoint, the 
finalization (writing out metadata and registering the checkpoint in ZooKeeper) 
happens in an I/O thread, and the checkpoint completes after that.

This sequence of events can happen:
  - tasks acks checkpoint
  - checkpoint fully acknowledged, finalization starts
  - task fails
  - task failure notification is dispatched
  - checkpoint completes.

For task failures and checkpoint completion, no order is defined.

However, for task restore and checkpoint completion, the order is well defined: 
When a task is restored, pending checkpoints are either canceled or complete. 
None can be within finalization. That is currently guaranteed with a lock in 
the {{CheckpointCoordinator}}.
(An implication of that being that restores can be blocking operations in the 
scheduler, which is not ideal from the perspective of making the scheduler 
async/non-blocking, but it is currently essential for correctness).




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20379) New Kafka Connector does not support DeserializationSchema

2020-11-26 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20379:


 Summary: New Kafka Connector does not support DeserializationSchema
 Key: FLINK-20379
 URL: https://issues.apache.org/jira/browse/FLINK-20379
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Stephan Ewen
 Fix For: 1.12.0


The new Kafka Connector defines its own deserialization schema and is 
incompatible with the existing library of deserializers.

That means that users cannot use all of Flink's Formats (Avro, JSON, Csv, 
Protobuf, Confluent Schema Registry, ...) with the new Kafka Connector.

I think we should change the new Kafka Connector to use the existing 
Deserialization classes, so all formats can be used, and users can reuse their 
deserializer implementations.

It would also be good to use the existing KafkaDeserializationSchema. Otherwise 
all users need to migrate their sources again.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20276) Transparent DeCompression of streams missing on new File Source

2020-11-22 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20276:


 Summary: Transparent DeCompression of streams missing on new File 
Source
 Key: FLINK-20276
 URL: https://issues.apache.org/jira/browse/FLINK-20276
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The existing {{FileInputFormat}} applies decompression (gzip, xy, ...) 
automatically on the file input stream, based on the file extension.

We need to add similar functionality for the {{StreamRecordFormat}} of the new 
FileSource to be on par with this functionality.

This can be easily applied in the {{StreamFormatAdapter}} when opening the file 
stream.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20188) Add Documentation for new File Source

2020-11-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20188:


 Summary: Add Documentation for new File Source
 Key: FLINK-20188
 URL: https://issues.apache.org/jira/browse/FLINK-20188
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20063) File Source requests an additional split on every restore.

2020-11-09 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20063:


 Summary: File Source requests an additional split on every restore.
 Key: FLINK-20063
 URL: https://issues.apache.org/jira/browse/FLINK-20063
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Currently, the {{FileSourceReader}} requests a new split when started. That 
includes cases when it was restored from a checkpoint.

So with every restore, the reader increases its split backlog size by one, 
causing problems for balanced split assignments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20049) Simplify handling of "request split".

2020-11-08 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20049:


 Summary: Simplify handling of "request split".
 Key: FLINK-20049
 URL: https://issues.apache.org/jira/browse/FLINK-20049
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


This issue is similar to FLINK-19265

Split assignment events are treated specially by the source API. Users do not 
create them directly but call methods on the contexts to assign splits.

The {{RequestSplitEvent}} is in contrast to that a custom user event and needs 
to be handled like a custom event, when sent by enumerators and received by the 
readers.

That seems a bit confusing and inconsistent, given that {{RequestSplitEvent}} 
is essential for all use cases with pull-based split assignment, which is 
pretty much any batch use case and various streaming use cases. The event 
should be on the same level as the AddSplitEvent.

I suggest that we add a {{SourceReaderContext.requestSplit()}} and 
{{SplitEnumerator.handleSplitRequest()}}, to have split requests and responses 
symmetrical.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20028) FileCompactionITCase is unstable

2020-11-06 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20028:


 Summary: FileCompactionITCase is unstable
 Key: FLINK-20028
 URL: https://issues.apache.org/jira/browse/FLINK-20028
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem, Table SQL / Ecosystem
Reporter: Stephan Ewen
 Fix For: 1.12.0


The {{ParquetFileCompactionITCase}} hangs and times out.

Log: 
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=178=logs=66592496-52df-56bb-d03e-37509e1d9d0f=ae0269db-6796-5583-2e5f-d84757d711aa

Exception:
{code}
org.junit.runners.model.TestTimedOutException: test timed out after 60 seconds
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119)
at 
org.apache.flink.table.pi.internal.TableResultImpl.await(TableResultImpl.java:86)
at 
org.apache.flink.table.planner.runtime.stream.sql.FileCompactionITCaseBase.testNonPartition(FileCompactionITCaseBase.java:91)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20008) Java Deadlock in ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement()

2020-11-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-20008:


 Summary: Java Deadlock in 
ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement()
 Key: FLINK-20008
 URL: https://issues.apache.org/jira/browse/FLINK-20008
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Stephan Ewen
 Fix For: 1.12.0


The stack trace detects a deadlock between the testing thread and the curator 
event thread.

Full log: 
https://dev.azure.com/sewen0794/Flink/_build/results?buildId=176=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374

Relevant Stack Trace:
{code}
Found one Java-level deadlock:
=
"main-EventThread":
  waiting to lock monitor 0x7f74c00045e8 (object 0x8ed14cb0, a 
java.lang.Object),
  which is held by "main"
"main":
  waiting to lock monitor 0x7f74e401a1f8 (object 0x8ed15008, a 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch),
  which is held by "main-EventThread"

Java stack information for the threads listed above:
===
"main-EventThread":
at 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:186)
- waiting to lock <0x8ed14cb0> (a java.lang.Object)
at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:158)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$9.apply(LeaderLatch.java:693)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$9.apply(LeaderLatch.java:689)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100)
at 
org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:688)
- locked <0x8ed15008> (a 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.checkLeadership(LeaderLatch.java:567)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.access$700(LeaderLatch.java:65)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$7.processResult(LeaderLatch.java:618)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:883)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:653)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.GetChildrenBuilderImpl$2.processResult(GetChildrenBuilderImpl.java:187)
at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:601)
at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508)
"main":
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.close(LeaderLatch.java:203)
- waiting to lock <0x8ed15008> (a 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.close(LeaderLatch.java:190)
at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.close(ZooKeeperLeaderElectionDriver.java:140)
at 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.stop(DefaultLeaderElectionService.java:103)
- locked <0x8ed14cb0> (a java.lang.Object)
at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement(ZooKeeperLeaderElectionTest.java:310)

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19804) Make FileSource class generic with respect to split types

2020-10-25 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19804:


 Summary: Make FileSource class generic with respect to split types
 Key: FLINK-19804
 URL: https://issues.apache.org/jira/browse/FLINK-19804
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


To support sources with different split types, the FileSource class should 
become generic with respect to split types.

To not create too complex generic signatures for users, we should add a new 
class {{AbstractFileSource}} which is generic and has all the shared generic 
methods.

The current {{FileSource}} would be changed to extend this class for the 
specific split type {{FileSourceSplit}}, and thus behave exactly like it 
currently does.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19803) Make File Source Checkpoint Serializer extensible

2020-10-25 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19803:


 Summary: Make File Source Checkpoint Serializer extensible
 Key: FLINK-19803
 URL: https://issues.apache.org/jira/browse/FLINK-19803
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The checkpoint type serializer should be generic and accept the split 
serializer, rather than being hard-wired to the {{FileSourceSplitSerializer}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19802) Let BulkFormat createReader and restoreReader methods accept Splits directly

2020-10-25 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19802:


 Summary: Let BulkFormat createReader and restoreReader methods 
accept Splits directly
 Key: FLINK-19802
 URL: https://issues.apache.org/jira/browse/FLINK-19802
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


To support sources where the splits communicate additional information, the 
BulkFormats should accept a generic split type, instead of path/offset/length 
from the splits directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19800) Make FileSourceSplit / FileSourceSplitState interaction extensible.

2020-10-25 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19800:


 Summary: Make FileSourceSplit / FileSourceSplitState interaction 
extensible.
 Key: FLINK-19800
 URL: https://issues.apache.org/jira/browse/FLINK-19800
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


We should add a method 
{{FileSourceSplit.updateWithCheckpointedPosition(CheckpointedPosition)}} that 
creates a new {{FileSourceSplit}} of the same type with an updated position.

Subclasses can override that method. The {{FileSourceSplitState}} uses that 
method to create the splits that are stored in the checkpoint.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19799) Make FileSource extensible

2020-10-25 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19799:


 Summary: Make FileSource extensible
 Key: FLINK-19799
 URL: https://issues.apache.org/jira/browse/FLINK-19799
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The File System Source currently assumes all formats can represent their work 
units as {{FileSourceSplit}}. If that is not the case, the formats cannot be 
implemented using the {{FileSource}}.

We need to support extending the splits to carry additional information in the 
splits, and to use that information when creating bulk readers and handling 
split state.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19516) PerJobMiniClusterFactoryTest. testJobClient()

2020-10-06 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19516:


 Summary: PerJobMiniClusterFactoryTest. testJobClient()
 Key: FLINK-19516
 URL: https://issues.apache.org/jira/browse/FLINK-19516
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Stephan Ewen


*Log:*
https://dev.azure.com/sewen0794/19b23adf-d190-4fb4-ae6e-2e92b08923a3/_apis/build/builds/151/logs/137

*Exception:*
{code}
[ERROR] 
testJobClient(org.apache.flink.client.program.PerJobMiniClusterFactoryTest)  
Time elapsed: 0.392 s  <<< FAILURE!
java.lang.AssertionError: 

Expected: is 
 but: was 
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
at 
org.apache.flink.client.program.PerJobMiniClusterFactoryTest.assertThatMiniClusterIsShutdown(PerJobMiniClusterFactoryTest.java:161)
at 
org.apache.flink.client.program.PerJobMiniClusterFactoryTest.testJobClient(PerJobMiniClusterFactoryTest.java:93)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19514) ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange times out

2020-10-06 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19514:


 Summary: 
ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange times 
out
 Key: FLINK-19514
 URL: https://issues.apache.org/jira/browse/FLINK-19514
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Stephan Ewen


Full logs:
https://dev.azure.com/sewen0794/19b23adf-d190-4fb4-ae6e-2e92b08923a3/_apis/build/builds/148/logs/115

Exception:
{code}
[ERROR] 
testJobExecutionOnClusterWithLeaderChange(org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase)
  Time elapsed: 301.093 s  <<< ERROR!
java.util.concurrent.TimeoutException: Condition was not met in given timeout.
at 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:132)
at 
org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.getNextLeadingDispatcherGateway(ZooKeeperLeaderElectionITCase.java:140)
at 
org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:122)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19499) Expose Metric Groups to Split Assigners

2020-10-03 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19499:


 Summary: Expose Metric Groups to Split Assigners
 Key: FLINK-19499
 URL: https://issues.apache.org/jira/browse/FLINK-19499
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Split Assigners should have access to metric groups, so they can report metrics 
on assignment, like pending splits, local-, and remote assignments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19498) Port LocatableInputSplitAssigner to new File Source API

2020-10-03 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19498:


 Summary: Port LocatableInputSplitAssigner to new File Source API
 Key: FLINK-19498
 URL: https://issues.apache.org/jira/browse/FLINK-19498
 Project: Flink
  Issue Type: Task
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The new File Source API needs a locality aware input split assigner.

To preserve the experience, I suggest to port the existing 
{{LocatableInputSplitAssigner}} from the {{InputFormat}} API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19494) Adjust "StreamExecutionEnvironment.generateSequence()" to new API Sources

2020-10-02 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19494:


 Summary: Adjust "StreamExecutionEnvironment.generateSequence()" to 
new API Sources
 Key: FLINK-19494
 URL: https://issues.apache.org/jira/browse/FLINK-19494
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Stephan Ewen
 Fix For: 1.12.0


The utility method {{StreamExecutionEnvironmant.generateSequence(()}} should 
instantiate the new {{NumberSequenceSource}} rather than the existing 
{{StatefulSequenceSource}}.

We should also deprecate the {{StatefulSequenceSource}} as part of this change, 
because
  - it is based on the legacy source API
  - it is not scalable, because it materializes the sequence




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19492) Consolidate Source Events between Source API and Split Reader API

2020-10-02 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19492:


 Summary: Consolidate Source Events between Source API and Split 
Reader API
 Key: FLINK-19492
 URL: https://issues.apache.org/jira/browse/FLINK-19492
 Project: Flink
  Issue Type: Improvement
  Components: API / Core, Connectors / Common
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The Source API (flink-core) and the SplitReader API (flink-connector-base) have 
currently separate copies of events to request splits and to signal that no 
splits are available.

We should consolidate those in flink-core.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19457) Port NumberSequenceSource to FLIP-27 source interface

2020-09-29 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19457:


 Summary: Port NumberSequenceSource to FLIP-27 source interface
 Key: FLINK-19457
 URL: https://issues.apache.org/jira/browse/FLINK-19457
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Both {{DataStream}} and {{DataSet}} APIs have a source generating a sequence of 
numbers.
This is useful for debugging and testing.

We should port this source to the FLIP-27 interface, to support testing 
programs with the new source API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19384) Source API exception signatures are inconsistent

2020-09-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19384:


 Summary: Source API exception signatures are inconsistent
 Key: FLINK-19384
 URL: https://issues.apache.org/jira/browse/FLINK-19384
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.11.2, 1.11.1, 1.11.0
Reporter: Stephan Ewen
 Fix For: 1.12.0


The methods in {{org.apache.flink.api.connector.source.Source}} have 
inconsistent exception signatures:
  - the methods to create reader and enumerator do not throw a checked exception
  - the method to restore an enumerator throws an {{IOException}}.

Either all methods should allow throwing checked IOExceptions or all methods 
should not allo any checked exceptions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19278) Bump Scala Macros Version to 2.1.1

2020-09-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19278:


 Summary: Bump Scala Macros Version to 2.1.1
 Key: FLINK-19278
 URL: https://issues.apache.org/jira/browse/FLINK-19278
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Stephan Ewen
 Fix For: 1.12.0


Scala Macros 2.1.0 does not support newer Scala versions, in particular not 
2.12.12.

Scala Macros 2.1.1 seems to support virtually all Scala 2.12 minor versions as 
well as the latest 2.11 versions (like 2.11.12).

See here for version compatibility: 
https://mvnrepository.com/artifact/org.scalamacros/paradise



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19265) Simplify handling of 'NoMoreSplitsEvent'

2020-09-16 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19265:


 Summary: Simplify handling of 'NoMoreSplitsEvent'
 Key: FLINK-19265
 URL: https://issues.apache.org/jira/browse/FLINK-19265
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Split assignment events are treated specially by the source API. Users do not 
create them directly but call methods on the contexts to assign splits.

The {{NoMoreSplitsEvent}} is in contrast to that a custom user event and needs 
to be handled like a custom event, when sent by enumerators and received by the 
readers.

That seems a bit confusing and inconsistent, given that NoMoreSplits is 
essential for all bounded stream use cases and is on the same level as the 
{{AddSplitEvent}}.

I suggest that we treat "no more splits" similarly, by having either a custom 
method or a custom "SplitAssignment" on the context and reader.

[~jqin] Curious what would be your take on this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19251) Avoid confusing queue handling in "SplitReader.handleSplitsChanges()"

2020-09-15 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19251:


 Summary: Avoid confusing queue handling in 
"SplitReader.handleSplitsChanges()"
 Key: FLINK-19251
 URL: https://issues.apache.org/jira/browse/FLINK-19251
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Currently, the method {{SplitReader.handleSplitsChanges()}} is passed a queue 
of split changes to handle. The method may decide to handle only a subset of 
them and is passes later all remaining changes.

In practice, this ends up being confusing and problematic:
  - It is important to remove the elements from the queue, not accidentally 
iterate, or the splits will get handles multiple times
  - If the queue is not left empty, the task to handle the changes is 
immediately re-enqueued. No other operation can happen before all split changes 
from the queue are handled.

A simpler and more efficient contract would be to simply pass a list of split 
changes directly and once, for the fetcher to handle. For all implementations 
so far, this was sufficient and easier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19250) SplitFetcherManager does not propagate errors correctly

2020-09-15 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19250:


 Summary: SplitFetcherManager does not propagate errors correctly
 Key: FLINK-19250
 URL: https://issues.apache.org/jira/browse/FLINK-19250
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.11.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0, 1.11.3


The first exception that is reported does not lead to a notification, only 
successive exceptions do. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19245) Set default queue capacity for FLIP-27 source handover queue to 2

2020-09-15 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19245:


 Summary: Set default queue capacity for FLIP-27 source handover 
queue to 2
 Key: FLINK-19245
 URL: https://issues.apache.org/jira/browse/FLINK-19245
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Based on initial investigation by [~jqin] a capacity value of two results in 
good queue throughput while still keeping the number of elements small.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19225) Improve code and logging in SourceReaderBase

2020-09-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19225:


 Summary: Improve code and logging in SourceReaderBase
 Key: FLINK-19225
 URL: https://issues.apache.org/jira/browse/FLINK-19225
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


An umbrella issue for minor improvements to the {{SourceReaderBase}}, such as 
logging, thread names, code simplifications.

The concrete change is described in the messages of the commits tagged with 
this issue (separate commits to better track the changes).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19223) Simplify Availability Future Model in Base Connector

2020-09-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19223:


 Summary: Simplify Availability Future Model in Base Connector
 Key: FLINK-19223
 URL: https://issues.apache.org/jira/browse/FLINK-19223
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The current model implemented by the {{FutureNotifier}} and the 
{{SourceReaderBase}} has a shortcoming:
  - It does not support availability notifications where the notification comes 
before the check. IN that case the notification is lost.

  - One can see the added complexity created by this model also in the 
{{SourceReaderBase#isAvailable()}} where the returned future needs to be 
"post-processed" and eagerly completed if the reader is in fact available. This 
is based on queue size, which makes it hard to have other conditions.

I think we can do something that is both easier and a bit more efficient by 
following a similar model as the 
{{org.apache.flink.runtime.io.AvailabilityProvider.AvailabilityHelper}}.

Furthermore, I believe we can win more efficiency by integrating this better 
with the {{FutureCompletingBlockingQueue}}.

I suggest to do a similar implementation as the {{AvailabilityHelper}} directly 
in the {{FutureCompletingBlockingQueue}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19221) Exploit LocatableFileStatus from Hadoop

2020-09-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19221:


 Summary: Exploit LocatableFileStatus from Hadoop
 Key: FLINK-19221
 URL: https://issues.apache.org/jira/browse/FLINK-19221
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hadoop Compatibility
Affects Versions: 1.11.1
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


When the HDFS Client returns a {{FileStatus}} (description of a file) it 
sometimes returns a {{LocatedFileStatus}} which already contains all the 
{{BlockLocation}} information.

We should expose this on the Flink side, because it may save is a lot of RPC 
calls to the name node. The file enumerators often request block locations for 
all files, currently doing an RPC call for each file.

When the FileStatus obtained from listing the directory (or getting details for 
a file) already has all the block locations, we can save the extra RPC call per 
file.

The suggested implementation is as follows:

  1. We introduce a {{LocatedInputSplit}} in Flink that we integrate with the 
built-in LocalFileSystem
  2. We integrate this with the HadoopFileSystems by creating a Flink 
{{LocatedInputSplit}} whenever the underlying file system created a {{Hadoop 
LocatedInputSplit}}
  3. As a safety net, the FS methods to access block information check whether 
the presented file status already contains the block information and return 
that information directly.

Steps one and two are for simplification of FileSystem users (no need to ask 
for extra info if it is available).

Step three is the transparent shortcut that all applications get even if they 
do not explicitly use the {{LocatedInputSplit}} and keep calling 
{{FileSystem.getBlockLocations()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19218) Remove inconsistent host logic for LocalFileSystem

2020-09-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19218:


 Summary: Remove inconsistent host logic for LocalFileSystem
 Key: FLINK-19218
 URL: https://issues.apache.org/jira/browse/FLINK-19218
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.11.1
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The {{LocalFileSystem}} returns file splits with a host information as returned 
via
{code}
InetAddress.getLocalHost().getHostName();
{code}

This might be different, though, from the host name that the TaskManager is 
configured to use, which results in incorrect location matching if this 
information is used.

It is also incorrect in cases where the file system is in fact not local, but a 
mounted NAS.

Since this information is anyways not useful (there no good way to support 
locality-aware file access for the LocalFileSystem) I would suggest to remove 
this code. That would be better than having code in place that tries to suggest 
locality information that is frequently incorrect.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19205) SourceReaderContext should give access to Configuration and Hostbame

2020-09-11 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19205:


 Summary: SourceReaderContext should give access to Configuration 
and Hostbame
 Key: FLINK-19205
 URL: https://issues.apache.org/jira/browse/FLINK-19205
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


To implement end-to-end configurability, Source Readers need access to the 
Flink configuration.

Source Readers sometimes need access to the hostname of the machine running the 
reader task, to add locality preferences to requests for partitions from the 
external source system.

The natural way to give access to both is through the {{SourceReaderContext}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19162) Allow Split Reader based sources to reuse record batches

2020-09-07 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19162:


 Summary: Allow Split Reader based sources to reuse record batches 
 Key: FLINK-19162
 URL: https://issues.apache.org/jira/browse/FLINK-19162
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The Split Readers hand over a batch of records at a time from the I/O thread 
(fetching and decoding) to the main operator processing thread.

 These structures can memory intensive and expensive and performance greatly 
benefits from reusing them. This is especially true for high-performance format 
readers like ORC and Parquet.

While previous sources (where I/O was in the main thread) could reuse objects 
in a trivial manner, the new Split Reader API (with multiple threads) needs an 
explicit {{recycle()}} hook to allow returning/reusing these objects.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19161) Port File Sources to FLIP-27 API

2020-09-07 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19161:


 Summary: Port File Sources to FLIP-27 API
 Key: FLINK-19161
 URL: https://issues.apache.org/jira/browse/FLINK-19161
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Porting the File sources to the FLIP-27 API means combining the
  - FileInputFormat from the DataSet Batch API
  - The Monitoring File Source from the DataStream API.

The two currently share the same reader code already and partial enumeration 
code.

*Structure*

The new File Source will have three components:

  - File enumerators that discover the files.
  - File split assigners that decide which reader gets what split
  - File Reader Formats, which deal with the decoding.


The main difference between the Bounded (Batch) version and the unbounded 
(Streaming) version is that the streaming version repeatedly invokes the file 
enumerator to search for new files.

*Checkpointing Enumerators*

The enumerators need to checkpoint the not-yet-assigned splits, plus, if they 
are in continuous discovery mode (streaming) the paths / timestamps already 
processed.

*Checkpointing Readers*

The new File Source needs to ensure that every reader can be checkpointed.
Some readers may be able to expose the position in the input file that 
corresponds to the latest emitted record, but many will not be able to do that 
due to
  - storing compresses record batches
  - using buffered decoders where exact position information is not accessible

We therefore suggest to expose a mechanism that combines seekable file offsets 
and records to read and skip after that offset. In the extreme cases, files can 
work only with seekable positions or only with records-to-skip. Some sources, 
like Avro, can have periodic seek points (sync markers) and count 
records-to-skip after these markers.

*Efficient and Convenient Readers*

To balance efficiency (batch vectorized reading of ORC / Parquet for vectorized 
query processing) and convenience (plug in 3-rd party CSV decoder over stream) 
we offer three abstraction for record readers

  - Bulk Formats that run over a file Path and return a iterable batch at a 
time _(most efficient)_

  - File Record formats which read files record-by-record. The source framework 
hands over a pre-defined-size batch from Split Reader to Record Emitter.

  - Stream Formats that decode an input stream and rely on the source framework 
to decide how to batch record handover _(most convenient)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19087) ReaultPartitionWriter should not expose subpartition but only subpartition-readers

2020-08-28 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19087:


 Summary: ReaultPartitionWriter should not expose subpartition but 
only subpartition-readers
 Key: FLINK-19087
 URL: https://issues.apache.org/jira/browse/FLINK-19087
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The {{ResultPartitionWiter}} currently gives arbitrary access to the 
sub-partitions.

These subpartitions may not always exist directly, such as in a sort based 
shuffle.
Necessary is only the access to a reader over a sub-partition's data (the 
ResultSubpartitionView).

In the spirit of minimal scope of knowledge, the methods should be scoped to 
return readers, not the more general subpartitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19047) Move unaligned checkpoint methods from ResultPartition to separate interface.

2020-08-25 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19047:


 Summary: Move unaligned checkpoint methods from ResultPartition to 
separate interface.
 Key: FLINK-19047
 URL: https://issues.apache.org/jira/browse/FLINK-19047
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


All ResultPartitions have the unaligned checkpointing methods, because some do 
not support checkpoints and throw an {{UnsupportedOperationException}}.

I suggest to follow the idea of interface segregation to put the methods 
relating to unaligned checkpoints in a dedicated interface.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19046) Introduce separate classes for PipelinedResultPartition and BoundedBlockingResultPartition

2020-08-25 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19046:


 Summary: Introduce separate classes for PipelinedResultPartition 
and BoundedBlockingResultPartition
 Key: FLINK-19046
 URL: https://issues.apache.org/jira/browse/FLINK-19046
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Currently, the SubPartition classes are specific to the partition type 
(pipelined, batched/blocking) but the parent Partition class is shared.

Given that the partitions behave differently regarding checkpoints, releasing, 
etc. the code is cleaner separated by introducing dedicated classes for the 
{{ResultPartitions}} based on the type.

This is also an important preparation to later have more different 
implementations, like sort-based shuffles.

Important: These new classes will not override any performance critical methods 
(like adding a buffer to the result). They merely specialize certain behaviors 
around checkpointing and cleanup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19045) Remove obsolete option 'taskmanager.network.partition.force-release-on-consumption'

2020-08-25 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19045:


 Summary: Remove obsolete option 
'taskmanager.network.partition.force-release-on-consumption'
 Key: FLINK-19045
 URL: https://issues.apache.org/jira/browse/FLINK-19045
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Stephan Ewen
 Fix For: 1.12.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19024) Remove unused "releaseMemory" from ResultSubpartition

2020-08-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19024:


 Summary: Remove unused "releaseMemory" from ResultSubpartition
 Key: FLINK-19024
 URL: https://issues.apache.org/jira/browse/FLINK-19024
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


The {{releaseMemory()}} call in the {{ResultSubpartition}} is currently not 
meaningful for any existing implementation.

Future versions where memory may have to be released will quite possibly not 
implement that on a "subpartition" level. For example, a sort based shuffle has 
the buffers on a partition-level, rather than a subpartition level.

We should thus remove the {{releaseMemory()}} call from the abstract 
subpartition interface. Concrete implementations can still release memory on a 
subpartition level, if needed in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19023) Remove pruning of Record Serializer Buffer

2020-08-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19023:


 Summary: Remove pruning of Record Serializer Buffer
 Key: FLINK-19023
 URL: https://issues.apache.org/jira/browse/FLINK-19023
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Currently, the {{SpanningRecordSerializer}} prunes its internal serialization 
buffer under special circumstances:

  - The buffer becomes larger than a certain threshold (5MB)
  - The full record end lines up exactly with a full buffer length (this change 
got introduced at some point, it is not clear what the purpose is)

This optimization virtually never kicks in (because of the second condition) 
and also seems unnecessary. There is only a single serializer on the sender 
side, so this will not help to reduce the maximum memory footprint needed in 
any way.

NOTE: A similar optimization on the reader side 
({{SpillingAdaptiveSpanningRecordDeserializer}}) makes sense, because multiple 
parallel deserializers run in order to piece together the records when 
retrieving buffers from the network in arbitrary order. Truncating buffers (or 
spilling) there helps reduce the maximum required memory footprint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19021) Cleanups of the ResultPartition components

2020-08-21 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-19021:


 Summary: Cleanups of the ResultPartition components
 Key: FLINK-19021
 URL: https://issues.apache.org/jira/browse/FLINK-19021
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


This is the umbrella issue for a set of simplifications and cleanups in the 
{{ResultPartition}} components.

This cleanup is in preparation for a possible future refactoring.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18447) Add 'flink-connector-base' to 'flink-dist'

2020-06-29 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-18447:


 Summary: Add 'flink-connector-base' to 'flink-dist'
 Key: FLINK-18447
 URL: https://issues.apache.org/jira/browse/FLINK-18447
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.11.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.12.0


Source connectors will rely mostly on 'flink-connector-base'. It is like a 
high-level Source API.

Including it in {{flink-dist}} would avoid that each connector has to package 
that into a fat-jar. It would then be used similarly to other APIs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18430) Upgrade stability to @Public for CheckpointedFunction and CheckpointListener

2020-06-24 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-18430:


 Summary: Upgrade stability to @Public for CheckpointedFunction and 
CheckpointListener
 Key: FLINK-18430
 URL: https://issues.apache.org/jira/browse/FLINK-18430
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The two interfaces {{CheckpointedFunction}} and {{CheckpointListener}} are used 
by many users, but are still (for years now) marked as {{@PublicEvolving}}.

I think this is not correct. They are very core to the DataStream API and are 
used widely and should be treated as {{@Public}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18429) Add default method for CheckpointListener.notifyCheckpointAborted(checkpointId)

2020-06-24 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-18429:


 Summary: Add default method for 
CheckpointListener.notifyCheckpointAborted(checkpointId)
 Key: FLINK-18429
 URL: https://issues.apache.org/jira/browse/FLINK-18429
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The {{CheckpointListener}} interface is implemented by many users. Adding a new 
method {{notifyCheckpointAborted(long)}} to the interface without a default 
method breaks many user programs.

We should turn this method into a default method:
  - Avoid breaking programs
  - It is in practice less relevant for programs to react to checkpoints being 
aborted then to being completed. The reason is that on completion you often 
want to commit side-effects, while on abortion you frequently do not do 
anything, but let the next successful checkpoint commit all changes up to then.

*Original Confusion*

There was confusion about this originally, going back to a comment by myself 
suggesting this should not be a default method, incorrectly thinking of it as 
an internal interface: 
https://github.com/apache/flink/pull/8693#issuecomment-542834147

See also clarification email on the mailing list:
{noformat}
About the "notifyCheckpointAborted()":

When I wrote that comment, I was (apparently wrongly) assuming we were talking 
about an internal interface here, because the "abort" signal was originally 
only intended to cancel the async part of state backend checkpoints.

I just realized that this is exposed to users - and I am actually with Thomas 
on this one. The "CheckpointListener" is a very public interface that many 
users implement. The fact that it is tagged "@PublicEvolving" is somehow not 
aligned with reality. So adding the method here will in reality break lots and 
lots of user programs.

I think also in practice it is much less relevant for user applications to 
react to aborted checkpoints. Since the notifications there can not be relied 
upon (if there is a task failure concurrently) users always have to follow the 
"newer checkpoint subsumes older checkpoint" contract, so the abort method is 
probably rarely relevant.

This is something we should change, in my opinion.
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18307) Replace "slave" file name with "workers"

2020-06-15 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-18307:


 Summary: Replace "slave" file name with "workers"
 Key: FLINK-18307
 URL: https://issues.apache.org/jira/browse/FLINK-18307
 Project: Flink
  Issue Type: Sub-task
  Components: Deployment / Scripts
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


See parent issue for a discussion of the rationale.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18124) Add documentation for new FLIP-27 source interface

2020-06-04 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-18124:


 Summary: Add documentation for new FLIP-27 source interface
 Key: FLINK-18124
 URL: https://issues.apache.org/jira/browse/FLINK-18124
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The documentation should be under {{Application Development / DataStream API / 
Data Sources }}.

We need to add sections about
  - Data Source Concepts
  - Data Source API
  - Connector Base (Split Reader Library)
  - Event Time and Watermarks




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18091) Test Relocatable Savepoints

2020-06-03 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-18091:


 Summary: Test Relocatable Savepoints
 Key: FLINK-18091
 URL: https://issues.apache.org/jira/browse/FLINK-18091
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Stephan Ewen
 Fix For: 1.11.0


The test should do the following:

  - take a savepoint. needs to make sure the job has enough state that there is 
more than just the "_metadata" file
  - copy it to another directory
  - start the job from that savepoint by addressing the metadata file and by 
addressing the savepoint directory

We should also test that an incremental checkpoint that gets moved fails with a 
reasonable exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17950) Broken Scala env.countinuousSource method

2020-05-26 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17950:


 Summary: Broken Scala env.countinuousSource method 
 Key: FLINK-17950
 URL: https://issues.apache.org/jira/browse/FLINK-17950
 Project: Flink
  Issue Type: Bug
  Components: API / Scala
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The Scala {{StreamExecutionEnvironment.countinuousSource(...)}} method has two 
critical problems:
  - Its return type is {{Unit}} instead of {{DataStream}}, so that no one can 
use the created stream
  - It does not forward the TypeInformation identified by the ScalaCompiler but 
relies on the Java TypeExtraction stack, which cannot handle most Scala types. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17906) Fix performance issues in WatermarkOutputMultiplexer

2020-05-24 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17906:


 Summary: Fix performance issues in WatermarkOutputMultiplexer
 Key: FLINK-17906
 URL: https://issues.apache.org/jira/browse/FLINK-17906
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The WatermarkOutputMultiplexer has some potential for performance improvements:
  - not using volatile variables (all accesses are anyways from the mailbox of 
under the legacy checkpoint lock)
  - Using some boolean logic instead of branches



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17904) Add "scheduleWithFixedDelay" to ProcessingTimeService

2020-05-24 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17904:


 Summary: Add "scheduleWithFixedDelay" to ProcessingTimeService
 Key: FLINK-17904
 URL: https://issues.apache.org/jira/browse/FLINK-17904
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Stephan Ewen
Assignee: Stephan Ewen


Adding {{"scheduleWithFixedDelay(...)"}} to {{ProcessingTimeService}} better 
support cases where fired timers are backed up. Rather than immediately firing 
again, they would wait their scheduled delay.

The implementation can be added in ProcessingTimeService in the exact same way 
as {{"scheduleAtFixedRate"}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17903) Evolve WatermarkOutputMultiplexer to make it reusable in FLIP-27 Sources

2020-05-24 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17903:


 Summary: Evolve WatermarkOutputMultiplexer to make it reusable in 
FLIP-27 Sources 
 Key: FLINK-17903
 URL: https://issues.apache.org/jira/browse/FLINK-17903
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The {{WatermarkOutputMultiplexer}} merges multiple independently generated 
watermarks.

To make it usable in the FLIP-27 sources, we need to
  - Change its IDs from integers to Strings (match splitIDs)
  - Support de-registration of local outputs (when splits are finished)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17899) Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources

2020-05-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17899:


 Summary: Integrate FLIP-126 Timestamps and Watermarking with 
FLIP-27 sources
 Key: FLINK-17899
 URL: https://issues.apache.org/jira/browse/FLINK-17899
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Stephan Ewen
Assignee: Stephan Ewen


*Preambel:* This whole discussion is to some extend only necessary, because in 
the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the 
{{pollNext(...)}} method. However, this design follows some deeper runtime 
pipeline design, and is not easy to change at this stage.

 

There are some principle design choices here:

 

*(1) Do we make Timestamps and Watermarks purely a feature of the library 
(ConnectorBase), or do we integrate it with the core (SourceOperator).*

Making it purely a responsibility of the ConnectorBase would have the advantage 
of keeping the SourceOperator simple. However, there is value in integrating 
this with the SourceOperator.
 - Implementations that are not using the ConnectorBase (like simple 
collection- or iterator-based sources) would automatically get access to the 
plug-able TimestampExtractors and WatermarkGenerators.

 - When executing batch programs, the SourceOperator can transparently inject a 
"no-op" WatermarkGenerator so make sure no Watermarks are generated during the 
batch execution. Given that batch sources are very performance sensitive, it 
seems useful to not even run the watermark generator logic, rather than later 
dropping the watermarks.

 - In a future version, we may want to implement "global watermark holds" 
generated my the Enumerators: The enumerator tells the readers how far they may 
advance their local watermarks. This can help to not prematurely advance the 
watermark based on a split's records when other splits have data overlapping 
with older ranges. An example where this is commonly the case is the streaming 
file source.

 

*(2) Is the per-partition watermarking purely a feature of the library 
(ConnectorBase), or do we integrate it with the core (SourceOperator).*

I believe we need to solve this on the same level as the previous question:
 - Once a connector instantiates the per-partition watermark generators, the 
main output (through which the SourceReader emits the records) must not run its 
watermark generator any more. Otherwise we extract watermarks also on the 
merged stream, which messes things up. So having the per-partition watermark 
generators simply in the ConnectorBase and emit transparently through an 
unchanged main output would not work.

 - So, if we decide to implement watermarks support in the core 
(SourceOperator), we would need to offer the per-partition watermarking 
utilities on that level as well.

 - Along a similar line of thoughts as in the previous point, the batch 
execution can optimize the watermark extraction by supplying no-op extractors 
also for the per-partition extractors (which will most likely bear the bulk of 
the load in the connectors).

 

*(3) How would an integration of WatermarkGenerators with the SourceOperator 
look like?*

Rather straightforward, the SourceOperator instantiates a SourceOutput that 
internally runs the timestamp extractor and watermark generator and emits to 
the DataOutput that the operator emits to.

 

*(4) How would an integration of the per-split WatermarkGenerators look like?*

I would propose to add a method to the {{SourceReaderContext}}: 
{{SplitAwareOutputs createSourceAwareOutputs()}}

The {{SplitAwareOutputs}} looks the following way:
{code:java}
public interface SplitAwareOutputs {

SourceOutput createOutputForSplit(String splitId);

void releaseOutputForSplit(String splitId);
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17898) Remove Exceptions from signatures of SourceOutput methods

2020-05-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17898:


 Summary: Remove Exceptions from signatures of SourceOutput methods
 Key: FLINK-17898
 URL: https://issues.apache.org/jira/browse/FLINK-17898
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Stephan Ewen
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17897) Resolve stability annotations discussion for FLIP-27 in 1.11

2020-05-23 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17897:


 Summary: Resolve stability annotations discussion for FLIP-27 in 
1.11 
 Key: FLINK-17897
 URL: https://issues.apache.org/jira/browse/FLINK-17897
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Stephan Ewen
Assignee: Jiangjie Qin
 Fix For: 1.11.0


Currently the interfaces from the FLIP-27 sources are all labeled as 
{{@Public}}.

Given that FLIP-27 is going to be in a "beta" version in the 1.11 release, we 
are discussing to downgrade the stability to {{@PublicEvolving}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17854) Use InputStatus directly in user-facing async input APIs (like source readers)

2020-05-20 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17854:


 Summary: Use InputStatus directly in user-facing async input APIs 
(like source readers)
 Key: FLINK-17854
 URL: https://issues.apache.org/jira/browse/FLINK-17854
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The flink-runtime uses the {{InputStatus}} enum in its 
{{PushingAsyncDataInput}}.
The flink-core {{SourceReader}} has a separate enum with the same purpose.

In the {{SourceOperator}} we need to bridge between these two, which is clumsy 
and a bit inefficient.

We can simply make {{InputStatus}} part of {{flink-core}} I/O packages and use 
it in the {{SourceReader}}, to avoid having to bridge it and the runtime part.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17850) PostgresCatalogITCase . testGroupByInsert() fails on CI

2020-05-20 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17850:


 Summary: PostgresCatalogITCase . testGroupByInsert() fails on CI
 Key: FLINK-17850
 URL: https://issues.apache.org/jira/browse/FLINK-17850
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Ecosystem
Affects Versions: 1.11.0
Reporter: Stephan Ewen
 Fix For: 1.11.0


{{org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase . 
testGroupByInsert}}


Error:
{code}
2020-05-20T16:36:33.9647037Z org.apache.flink.table.api.ValidationException: 
2020-05-20T16:36:33.9647354Z Field types of query result and registered 
TableSink mypg.postgres.primitive_table2 do not match.

2020-05-20T16:36:33.9648233Z Query schema: [int: INT NOT NULL, EXPR$1: 
VARBINARY(2147483647) NOT NULL, short: SMALLINT NOT NULL, EXPR$3: BIGINT, 
EXPR$4: FLOAT, EXPR$5: DOUBLE, EXPR$6: DECIMAL(10, 5), EXPR$7: BOOLEAN, EXPR$8: 
VARCHAR(2147483647), EXPR$9: CHAR(1) NOT NULL, EXPR$10: CHAR(1) NOT NULL, 
EXPR$11: VARCHAR(20), EXPR$12: TIMESTAMP(5), EXPR$13: DATE, EXPR$14: TIME(0), 
EXPR$15: DECIMAL(38, 18)]

2020-05-20T16:36:33.9650272Z Sink schema: [int: INT, bytea: 
VARBINARY(2147483647), short: SMALLINT, long: BIGINT, real: FLOAT, 
double_precision: DOUBLE, numeric: DECIMAL(10, 5), decimal: DECIMAL(10, 1), 
boolean: BOOLEAN, text: VARCHAR(2147483647), char: CHAR(1), character: CHAR(3), 
character_varying: VARCHAR(20), timestamp: TIMESTAMP(5), date: DATE, time: 
TIME(0), default_numeric: DECIMAL(38, 18)]

2020-05-20T16:36:33.9651218Zat 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100)
2020-05-20T16:36:33.9651689Zat 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:218)
2020-05-20T16:36:33.9652136Zat 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:193)
2020-05-20T16:36:33.9652936Zat scala.Option.map(Option.scala:146)
2020-05-20T16:36:33.9653593Zat 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:193)
2020-05-20T16:36:33.9653993Zat 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:152)
2020-05-20T16:36:33.9654428Zat 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:152)
2020-05-20T16:36:33.9654841Zat 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
2020-05-20T16:36:33.9655221Zat 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
2020-05-20T16:36:33.9655759Zat 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
2020-05-20T16:36:33.9656072Zat 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
2020-05-20T16:36:33.9656413Zat 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
2020-05-20T16:36:33.9656890Zat 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
2020-05-20T16:36:33.9657211Zat 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
2020-05-20T16:36:33.9657525Zat 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
2020-05-20T16:36:33.9657878Zat 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:152)
2020-05-20T16:36:33.9658350Zat 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1217)
2020-05-20T16:36:33.9658784Zat 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:663)
2020-05-20T16:36:33.9659391Zat 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:750)
2020-05-20T16:36:33.9659856Zat 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:653)
2020-05-20T16:36:33.9660507Zat 
org.apache.flink.table.planner.runtime.utils.TableEnvUtil$.execInsertSqlAndWaitResult(TableEnvUtil.scala:27)
2020-05-20T16:36:33.9661115Zat 
org.apache.flink.table.planner.runtime.utils.TableEnvUtil.execInsertSqlAndWaitResult(TableEnvUtil.scala)
2020-05-20T16:36:33.9661583Zat 
org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase.testGroupByInsert(PostgresCatalogITCase.java:88)
{code}

Full log: 
https://dev.azure.com/sewen0794/19b23adf-d190-4fb4-ae6e-2e92b08923a3/_apis/build/builds/25/logs/93



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17849) YARNHighAvailabilityITCase hangs in Azure Pipelines CI

2020-05-20 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17849:


 Summary: YARNHighAvailabilityITCase hangs in Azure Pipelines CI
 Key: FLINK-17849
 URL: https://issues.apache.org/jira/browse/FLINK-17849
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.11.0
Reporter: Stephan Ewen
 Fix For: 1.11.0


The test seems to hang for 15 minutes, then gets killed.

Full logs: 
https://dev.azure.com/sewen0794/19b23adf-d190-4fb4-ae6e-2e92b08923a3/_apis/build/builds/25/logs/121



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17816) Change Latency Marker to work with "scheduleAtFixedDelay" instead of "scheduleAtFixedRate"

2020-05-19 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17816:


 Summary: Change Latency Marker to work with "scheduleAtFixedDelay" 
instead of "scheduleAtFixedRate"
 Key: FLINK-17816
 URL: https://issues.apache.org/jira/browse/FLINK-17816
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Reporter: Stephan Ewen


Latency Markers and other periodic timers are scheduled with 
{{scheduleAtFixedRate}}. That means every X time the callable is called. If it 
blocks (backpressure) is can be called immediately again.

I would suggest to switch this to {{scheduleAtFixedDelay}}  to avoid calling 
for a lot of latency marker injections when there is no way to actually execute 
the injection call.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17781) OperatorCoordinator Context must support calls from thread other than JobMaster Main Thread

2020-05-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17781:


 Summary: OperatorCoordinator Context must support calls from 
thread other than JobMaster Main Thread
 Key: FLINK-17781
 URL: https://issues.apache.org/jira/browse/FLINK-17781
 Project: Flink
  Issue Type: Sub-task
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


Currently, calls on the Context in the OperatorCoordinator go directly 
synchronously to the ExcutionGraph.

There are two critical problems are:
  - It is common that the code in the OperatorCoordinator runs in a separate 
thread (for example, because it executes blocking operations). Calling the 
scheduler from another thread causes the Scheduler to crash (Assertion Error, 
violation of single threaded property)
  - Calls on the ExecutionGraph are removed as part of removing the legacy 
scheduler. Certain calls do not work any more.

+Problem Level 1:+

The solution would be to pass in the scheduler and a main thread executor to 
interact with it.

However, to do that the scheduler needs to be created before the 
OperatorCoordinators are created. One could do that by creating the 
Coordinators lazily after the Scheduler.

+Problem Level 2:+

The Scheduler restores the savepoints as part of the scheduler creation, when 
the ExecutionGraph and the CheckpointCoordinator are created early in the 
constructor.
(Side note: That design is tricky in itself, because it means state is restored 
before the scheduler is even properly constructed.)

That means the OperatorCoordinator needs to exist (or an in placeholder 
component needs to exist) to accept the restored state.

That brings us to a cyclic dependency:
  - OperatorCoordinator (context) needs Scheduler and MainThreadExecutor
  - Scheduler and MainThreadExecutor need constructed ExecutionGraph
  - ExecutionGraph needs CheckpointCoordinator
  - CheckpointCoordinator needs OperatorCoordinator

+Breaking the Cycle+

The only way we can do this is with a form of lazy initialization:
  - We eagerly create the OperatorCoordinators so they exist for state restore
  - We provide an uninitialized context to them
  - When the Scheduler is started (after leadership is granted) we initialize 
the context with the (then readily constructed) Scheduler and MainThreadExecutor

+Longer-term Solution+

The longer term solution would require a major change in the Scheduler and 
CheckpointCoordinator setup. Something like this:
  - Scheduler (and ExecutionGraph) are constructed first
  - JobMaster waits for leadership
  - Upon leader grant, Operator Coordinators are constructed and can reference 
the Scheduler and FencedMainThreadExecutor
  - CheckpointCoordinator is constructed and references ExecutionGraph and 
OperatorCoordinators
  - Savepoint or latest checkpoint is restored

The implementation of the current should try to couple parts as loosely as 
possible to make it easy to implement the above approach later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17702) OperatorCoordinators must be notified of tasks cancelled as part of failover

2020-05-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17702:


 Summary: OperatorCoordinators must be notified of tasks cancelled 
as part of failover
 Key: FLINK-17702
 URL: https://issues.apache.org/jira/browse/FLINK-17702
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The OperatorCoordinators are currently only notified of tasks that directly 
fail.

However, tasks that are cancelled (as part of the regional failover) must be 
handled the same was and also send notifications to the OperatorCoordinator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17701) Exclude jdk:tools dependency from all Hadoop dependencies for Java 9+ compatibility

2020-05-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17701:


 Summary: Exclude jdk:tools dependency from all Hadoop dependencies 
for Java 9+ compatibility
 Key: FLINK-17701
 URL: https://issues.apache.org/jira/browse/FLINK-17701
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


Hadoop transitively pulls the system dependency {{jdk:tools}} which is not 
longer available on Java 9+. This causes errors when importing the code into an 
IDE with runs Java 11.

This dependency is anyways not needed when running the code, because the 
classes are always present. It can be safely excluded form the transitive 
dependencies.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17699) Reduce scope for SourceOperator arguments and initialize more eagerly

2020-05-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17699:


 Summary: Reduce scope for SourceOperator arguments and initialize 
more eagerly
 Key: FLINK-17699
 URL: https://issues.apache.org/jira/browse/FLINK-17699
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


Currently, the {{SourceOperator}} only gets a {{Source}} in the constructor.

All actual components that the {{SourceOperator}} relies on when working are 
lazily initialized, in {{open()}} or via setters.

Relying on something as broad as {{Source}} also means that a lot of redundant 
context has to be provided to the {{SourceOperator}} during initialization. The 
{{Source}} is, for example, also responsible for the {{SourceEnumerator}}, 
which is independent of the {{SourceOperator}}. However, it needs to be 
considered during testing, now, because the tests need to mock a full 
{{Source}} in order to instantiate a {{SourceOperator}}.

The solution is to directly pass the collaborators of the {{SourceOperator}} 
directly eagerly into the constructor. It is not fully possible with the 
{{SourceReader}}, but for that we can at least reduce the scope by passing a 
targeted factory function.






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17696) Support eager initialization of operators with OperatorEventDispatcher

2020-05-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17696:


 Summary:  Support eager initialization of operators with 
OperatorEventDispatcher
 Key: FLINK-17696
 URL: https://issues.apache.org/jira/browse/FLINK-17696
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The {{StreamOperatorParameters}} are parameters available for eager 
initialization of the {{StreamOperators}}. We should add the 
{{OperatorEventDispatcher}} to make it available for eager initialization.

At the same time, we should split steps "obtaining the 
{{OperatorEventGateway}}" and "registering the {{OperatorEventHandler}}" to 
support getting an eager reference to the {{OperatorEventGateway}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17695) Simplify SourceOperator by using a utility SimpleVersionedListState

2020-05-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17695:


 Summary: Simplify SourceOperator by using a utility 
SimpleVersionedListState
 Key: FLINK-17695
 URL: https://issues.apache.org/jira/browse/FLINK-17695
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The SourceOperator has some boiler plate code taking the bytes out of the 
{{ListState}} and applying the {{SimpleVersionedSerializer}} to turn 
them into the splits.

This repeated code can be simply encapsulated in a utility class 
{{SimpleVersionedListState}} which wraps a {{ListState}} and applies 
the serialization and de-serialization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17694) Wrong min-length check in SimpleVersionedSerialization

2020-05-14 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17694:


 Summary: Wrong min-length check in SimpleVersionedSerialization
 Key: FLINK-17694
 URL: https://issues.apache.org/jira/browse/FLINK-17694
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.10.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


The SimpleVersionedSerialization checks for a minimum of 4 bytes when checking 
the arguments, but needs at least 8 bytes (two integers).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17674) OperatorCoordinator state in checkpoints should always be a ByteStreamStateHandle

2020-05-13 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-17674:


 Summary: OperatorCoordinator state in checkpoints should always be 
a ByteStreamStateHandle
 Key: FLINK-17674
 URL: https://issues.apache.org/jira/browse/FLINK-17674
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0


State restore to the task vertices and coordinators (even after loading the 
Checkpoint Metadata) happens in the JobManager's main thread and must 
consequently not do any potentially blocking I/O operations.

The OperatorCoordinator state is a generic {{StreamStateHandle}} whose state 
might require I/O to retrieve. This never happens in the current implementation 
(we always use {{ByteStreamStateHandle}}) the signatures and contracts don't 
guarantee that and leave this open for a potential future bug.

Typing the OperatorCoordinator state to ByteStreamStateHandle makes sure that 
we can always retrieve the data directly without I/O and clarifies that no 
arbitrary StreamStateHandle is supported at that point. 

If state restoring becomes an asynchronous operation we can relax this 
restriction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   8   >