[jira] [Created] (FLINK-24852) Cleanup of Orphaned Incremental State Artifacts
Stephan Ewen created FLINK-24852: Summary: Cleanup of Orphaned Incremental State Artifacts Key: FLINK-24852 URL: https://issues.apache.org/jira/browse/FLINK-24852 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.14.0 Reporter: Stephan Ewen Shared State Artifacts (state files in the "shared" folder in the DFS / ObjectStore) can become orphaned in various situations: * When a TaskManager fails right after it created a state file but before the checkpoint was ack-ed to the JobManager, that state file will be orphaned. * When the JobManager fails all state newly added for the currently pending checkpoint will be orphaned. These state artifacts are currently impossible to be cleaned up manually, because it isn't easily possible to understand whether they are still being used (referenced by any checkpoint). We should introduce a "garbage collector" that identifies and deletes such orphaned state artifacts. h2. Idea for a cleanup mechanism A periodic cleanup thread would periodically execute a cleanup procedure that searches for and deletes the orphaned artifacts. To identify those artifacts, the cleanup procedure needs the following inputs: * The oldest retained checkpoint ID * A snapshot of the shared state registry * A way to identify for each state artifact from which checkpoint it was created. The cleanup procedure would * enumerate all state artifacts (for example files in the "shared" directory) * For each one check whether it was created earlier than the oldest retained checkpoint. If not, that artifact would be skipped, because it might come from a later pending checkpoint, or later canceled checkpoint. * Finally, the procedure checks if the state artifact is known by the shared state registry. If yes, the artifact is kept, if not, it is orphaned and will be deleted. Because the cleanup procedure is specific to the checkpoint storage, it should probably be instantiated from the checkpoint storage. To make it possible to identify the checkpoint for which a state artifact was created, we can put that checkpoint ID into the state file name, for example format the state name as {{"_"}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24366) Unnecessary/misleading error message about failing restores when tasks are already canceled.
Stephan Ewen created FLINK-24366: Summary: Unnecessary/misleading error message about failing restores when tasks are already canceled. Key: FLINK-24366 URL: https://issues.apache.org/jira/browse/FLINK-24366 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.13.2, 1.14.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.13.3, 1.15.0, 1.14.1 The following line is logged in all cases where the restore operation fails. The check whether the task is canceled comes only after that line. The fix would be to move the log line to after the check. {code} Exception while restoring my-stateful-task from alternative (1/1), will retry while more alternatives are available. {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24343) Revisit Scheduler and Coordinator Startup Procedure
Stephan Ewen created FLINK-24343: Summary: Revisit Scheduler and Coordinator Startup Procedure Key: FLINK-24343 URL: https://issues.apache.org/jira/browse/FLINK-24343 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.2, 1.14.0 Reporter: Stephan Ewen Fix For: 1.15.0 We need to re-examine the startup procedure of the scheduler, and how it interacts with the startup of the operator coordinators. We need to make sure the following conditions are met: - The Operator Coordinators are started before the first action happens that they need to be informed of. That includes as task being ready, a checkpoint happening, etc. - The scheduler must be started to the point that it can handle "failGlobal()" calls, because the coordinators might trigger that during their startup when an exception in "start()" occurs. /cc [~chesnay] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24255) Test Environment / Mini Cluster do not forward configuration.
Stephan Ewen created FLINK-24255: Summary: Test Environment / Mini Cluster do not forward configuration. Key: FLINK-24255 URL: https://issues.apache.org/jira/browse/FLINK-24255 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 When using {{StreamExecutionEnvironment getExecutionEnvironment(Configuration)}}, the config should determine the characteristics of the execution. The config is for example passed to the local environment in the local execution case, and used during the instantiation of the MiniCluster. But when using the {{TestStreamEnvironment}} and the {{MiniClusterWithClientRule}}, the config is ignored. The issue is that the {{StreamExecutionEnvironmentFactory}} in {{TestStreamEnvironment}} ignores the config that is passed to it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23843) Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure instead of Process Kill
Stephan Ewen created FLINK-23843: Summary: Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure instead of Process Kill Key: FLINK-23843 URL: https://issues.apache.org/jira/browse/FLINK-23843 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 Currently, when a the method "SplitEnumeratorContext.runInCoordinatorThread()" throws an exception, the effect is a process kill of the JobManager process. The chain how the process kill happens is: * An exception bubbling up in the executor, killing the executor thread * The executor starts a replacement thread, which is forbidden by the thread factory (as a safety net) and causes a process kill. We should prevent such exceptions from bubbling up in the coordinator executor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23842) Add log messages for reader registrations and split requests.
Stephan Ewen created FLINK-23842: Summary: Add log messages for reader registrations and split requests. Key: FLINK-23842 URL: https://issues.apache.org/jira/browse/FLINK-23842 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 Currently, there are is nothing logged when source enumerators get reader registration events, or when they receive split requests. While some specific source implementations log this in their implementation, for the general case, this information is missing, even though it is super valuable when debugging and understanding the work assignment behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23649) Add RocksDB packages to parent-first classloading patterns.
Stephan Ewen created FLINK-23649: Summary: Add RocksDB packages to parent-first classloading patterns. Key: FLINK-23649 URL: https://issues.apache.org/jira/browse/FLINK-23649 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.13.2 Reporter: Stephan Ewen Fix For: 1.14.0 RocksDB classes are currently loaded child-first. Because of that, it can happen that the RocksDB library is attempted to be loaded multiple times (by different classloaders). That is prevented by JNI and results in an error as reported in this mail for example https://lists.apache.org/x/thread.html/rbc3ca24efe13b25e802af9739a6877276503363ffbdc5914ffdad7be@%3Cuser.flink.apache.org%3E We should prevent accidental repeated loading of RocksDB, because we rely on the fact that only one DB is created per task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23630) Make EventTimeWindowCheckpointingITCase and LocalRecoveryITCase run on Windows.
Stephan Ewen created FLINK-23630: Summary: Make EventTimeWindowCheckpointingITCase and LocalRecoveryITCase run on Windows. Key: FLINK-23630 URL: https://issues.apache.org/jira/browse/FLINK-23630 Project: Flink Issue Type: Bug Components: Tests Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 This need a fix in the test where it creates the paths for the checkpoint storage locations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23629) Remove redundant test cases in EventTimeWindowCheckpointingITCase
Stephan Ewen created FLINK-23629: Summary: Remove redundant test cases in EventTimeWindowCheckpointingITCase Key: FLINK-23629 URL: https://issues.apache.org/jira/browse/FLINK-23629 Project: Flink Issue Type: Bug Components: Tests Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 HashMap state store snapshots are always async right now, sync snapshots are no longer supported. We should adjust the {{EventTimeWindowCheckpointingITCase}} to remove the now redundant cases {{MEM_ASYNC}} and {{FILE_ASYNC}} parameter runs. The test is very time-intensive, so this is quite a time saver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23301) StateFun HTTP Ingress
Stephan Ewen created FLINK-23301: Summary: StateFun HTTP Ingress Key: FLINK-23301 URL: https://issues.apache.org/jira/browse/FLINK-23301 Project: Flink Issue Type: Sub-task Components: Stateful Functions Reporter: Stephan Ewen The HTTP ingress would start an HTTP Server at a specified port. The HTTP server would only handle _POST_ requests. The target function is represented by the path to which the request is made, the message contents is the body of the POST request. The following example would send an empty message to the function with the address \{{namespace='example', type='greeter', id='Igal'}}. {code} curl -X POST http://statefun-ingress:/in/example/greeter/Igal POST /in/example/greeter/Igal HTTP/1.1 Content-Length: 0 {code} The example below would send empty message of type 'statefun/string' to the function with the address \{{namespace='example', type='greeter', id='Elisa'}} and the message contents\{{"{numTimesToGreet: 5}"}}. curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d "\{numTimesToGreet: 5}" http://statefun-ingress:/in/example/greeter/Elisa POST /in/example/greeter/Elisa HTTP/1.1 Content-Type: text/plain; charset=UTF-8 Content-Length: 20 {numTimesToGreet: 5} {code} h3. Data Types The content type (mime type) specified in the request header of the HTTP request will be directly mapped to the statefun types. For example, a \{{Content-Type: io.statefun.tyes/int}} will set the type of the message to \{{io.statefun.tyes/int}}. As a special case, we map the content type \{{text/plain}} to \{{io.statefun.tyes/string}}, to make simple cases and examples work more seamlessly. The following examples would send a message to a function that expectes a ProtoBuf encoded type \{{Greeting}} registerd in StateFun as \{{example/greeting}}. {code} > curl -X POST -H "Content-Type: text/plain; charset=UTF-8" -d > "\{numTimesToGreet: 5}" > CONTENTS=`echo 'numTimesToGreet: 5' | ./protoc --encode Greeting > example_types.proto` > echo $CONTENTS | curl -X POST -H "Content-Type: example/greeting" > --data-binary @- http://statefun-ingress:/in/example/greeter/Bobby {code} h3. Sender and Responses We want to support round-trip-style interactions, meaning posting a request and receiving a response. Given the async messaging nature of StateFun, this might not be necessarily in one HTTP request which immediately gives you the corresponsing response. Instead, it can be in issuing (POST) a request to the HTTP ingress and polling (GET) a response from an associated HTTP Egress. To support these kind of patterns, the HTTP ingress will assign a random request correlation ID in the HTTP response. Furthermore, the ingress will optionally set the \{{sender()}} field of the created message to reference a configured associated egress. The ingress config woud add an entry referencing the egress (like \{{'paired_egress_name: httpout'}}). {code} > curl -X POST -i http://statefun-ingress:/in/example/greeter/Igal POST /in/example/greeter/Igal HTTP/1.1 Content-Length: 0 HTTP/1.1 200 OK StateFun-Request-Correlation-ID: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5 Content-Length: 0 {code} The created message would have no body, but would have the \{{sender() = {{egress/httpout/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5}}. _Note: We would need to extend the message address scheme to be able to reference egresses. The egress itself can grab the correlation ID from the ID part of the address, because the HTTP egres doesn't use that field (in fact, no egress currently interprets the ID)._ h3. Singleton Instantiation To avoid port conflicts, we need to do a singleton instantiation per JVM. This can be achieved by using a statically referenced context to hold the instantiated servier and a reference to the In the future, we can look into extending this to avoid setup/teardown when operators are cancelled for recovery. The server would then live as long as the StateFun application (job) lives (or more precisely, as long as the slot lives, which is the duration that the TaskManager is associated with the StateFun deployment - typically the entire lifetime). To achieve that, we would tear down the server in a [shutdown hook of the user-code classloader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L145). Instead of letting the first source set up the server, the first source would register its output as the stream for the server to push messages to. h3. Configuration parameters - Bind host (default 0.0.0.0) - Bind port (default ) - Path (default "in") (for the path in the URL \{{http(s)://:}}) - Egress pair name, for setting the egress that replies should go to. - To setup SSL for the connection, we add similar settings as for Flink's REST
[jira] [Created] (FLINK-23281) StateFun - Simplify Getting Stared Experience with HTTP Ingresses and Egresses
Stephan Ewen created FLINK-23281: Summary: StateFun - Simplify Getting Stared Experience with HTTP Ingresses and Egresses Key: FLINK-23281 URL: https://issues.apache.org/jira/browse/FLINK-23281 Project: Flink Issue Type: New Feature Components: Stateful Functions Reporter: Stephan Ewen To make it easier to get started with StateFun, we want to reduce the dependencies on other systems and tools that are currently required to get your first program running. _(For reference, you currently need a docker-compose setup with at least Flink, Kafka, ZooKeeper, and then you need to interact with it using Kafka command line tools (or other clients) to publish messages to the ingress topic.)_ This issue aims to add simple pre-packaged HTTP ingresses/egresses that can be used for examples and exploration, and can be used with standard tools (like \{{curl}}). That reduces the barrier to exploration. _(Citing @ssc here: you have roughlyone lunchbreak of time to get a developer excited. Many devs just play around for about 45 minutes, and when they don't see some preliminary success with simple examples, they drop the exploration.)_ An example interaction could be: {code} > curl -X POST -i http://:/in/example/greeter/Igal HTTP/2 200 request-id: 8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5 > curl -X GET > http://:/out/8acb377c-fc5e-4bdb-b2cc-eddb5992b7b5 Hello for the 1337th time... {code} *Note:* The HTTP Ingress/Egress here are different from the HTTP state access from FLINK-23261. State requests against the state access API (FLINK-23261) only interacts with state entries and never invoke functions. In contrast, messages against the here-proposed Ingress/Egress send messages to functions like any other ingress. This is the umbrella issue. Dedicated tasks for ingress/egress and request correlation are in the subtasks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23261) StateFun - HTTP State Access Interface
Stephan Ewen created FLINK-23261: Summary: StateFun - HTTP State Access Interface Key: FLINK-23261 URL: https://issues.apache.org/jira/browse/FLINK-23261 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Stephan Ewen h2. Functionality To improve operations of StateFun applications, we should offer an interface to query and manipulate state entries. This can be used for exploration and debugging purposes, and to repair state if the application state needs to be corrected. To make this simple, I would suggest an HTTP REST interface: - GET: read state entry for key - PUT: set/overwrite state for key - DELETE: drop state entry for key The URLs could be: {{http(s)://statefun-service/state}}, where the string {{//}} is the fully-qualified address of the target, and the {{statename}} is the name under which that persistent state is stored. Keys are always UTF-8 strings in StateFun, so they can be encoded in the URL. For the responses, we would use the common codes 200 for GET/PUT/DELETE success and 404 for GET/DELETE not found. The state values, as returned from GET requests, would be generally just the bytes, and not interpreted by this request handling. The integrate of the StateFun type system and HTTP content types (mime types) is up for further discussion. One option is set the content type response header to {{"statefun/"}}, where all non-simple types map to {{Content-Type: application/octet-stream}}. We may make an exception for strings which could be returned as {{Content-Type: text/plain; charset=UTF-8}}. Later refinement is possible, like auto-stringifying contents when the request indicates to only accept {{text/plain}} responses. h2. Failure Guarantees The initial failure guarantees for PUT/DELETE would be none - the requests would be handled best effort. We can easily extend this later in one of two ways: - Delay responses to the HTTP requests until the next checkpoint is complete. That makes synchronous interaction easy and sounds like a good match for a more admin-style interface. - Return the current checkpoint ID and offer a way to poll until the next checkpoint is completed. This avoid blocking requests, but puts more burden on the caller. Given the expected nature of the use cases for PUT/DELETE are more of a "admin/fix" nature, I would suggest to go with synchronous requests, for simplicity. h2. Implementation There are two options to implement this: (1) A Source/Sink (Ingress/Egress) pair (2) An Operator Coordinator with HTTP Requests *Option (1) - Source/Sink pair* We would implement an specific source that is both an ingress and an egress. The source would spawn a HTTP server (singleton per TM process). Requests would be handled as follows: - Am HTTP request gets a generated correlation-ID. - The source injects a new message type (a "control message") into the stream. That message holds the Correlation-ID, the parallel subtask index of the originating source, and the target address and state name. - The function dispatcher handled these message in a special way, retrieving the state and sending an Egress message with the Correlation-ID to the parallel subtask of the egress as indicated by the message's subtask index. - The Egress (which is the same instance as the ingress source) uses to correlation ID to respond to the request. Advantages: - No changes necessary in Flink - Might sustain higher throughput, due to multiple HTTP endpoints Disadvantages: - Additional HTTP servers and ports require more setup (like service definitions on K8s). - Need to introduce new control message type and extend function dispatcher to handle them. - Makes a hard assumption that sources run on all slots. Needs "ugly" singleton hack to start only one server per TM process. *Option (2) - Operator Coordinator* Operator Coordinators are instances that run on the {{JobManager}} and can communicate with the Tasks via RPC. Coordinators can receive calls from HTTP handlers at the JobManager's HTTP endpoint. An example for this is the Result Fetching through HTTP/OperatorCoordinator requests. We would need a patch to Flink to allow registering custom URLs and passing the path as a parameter to the request. The RPCs can be processed in the mailbox on the Tasks, making them thread safe. This would also completely avoid the round-trip (source-to-sink) problem, the tasks simply need to send a response back to the RPC. Advantages: - Reuse existing HTTP Endpoint and port. No need to have an additional HTTP server and port and service, for this admin-style requests, this approach re-uses Flink's admin HTTP endpoint. - No need for singleton HTTP Server logic in Tasks - Does require the assumption that all TMs run an instance of all operators. - No need for "control messages" and
[jira] [Created] (FLINK-23093) Limit number of I/O pool and Future threads in Mini Cluster
Stephan Ewen created FLINK-23093: Summary: Limit number of I/O pool and Future threads in Mini Cluster Key: FLINK-23093 URL: https://issues.apache.org/jira/browse/FLINK-23093 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Stephan Ewen Fix For: 1.14.0 When running tests on CI via the minicluster, the mini cluster typically spawns 100s of I/O threads, both in the MiniCluster I/O pool and in the TM I/O pool. The standard rule for the maximum pool size is 4*num-cores, but the number of cores can be fairly large these days. Various Java versions also mess up core counting when running in containers (JVM container might have been given 2 cores as resource limits, but the JVM counts the system as a whole, like 64/128 cores). This is both a nuisance for debugging, and a big waste of memory (each thread takes by default around 1MB when spawned, so the test JVM wastes 100s of MBs for nothing). I would suggest to set a default of 8 I/O threads for the Mini Cluster. The scaling-with-cores is important for proper TM/JM deployments, but not for the Mini Cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22753) Activate log-based Checkpoints for StateFun
Stephan Ewen created FLINK-22753: Summary: Activate log-based Checkpoints for StateFun Key: FLINK-22753 URL: https://issues.apache.org/jira/browse/FLINK-22753 Project: Flink Issue Type: Sub-task Components: Stateful Functions Reporter: Stephan Ewen Once available, we should use log-based checkpointing in StateFun, to have predictable checkpointing times and predictably low end-to-end exactly-once latencies. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22752) Add robust default state configuration to StateFun
Stephan Ewen created FLINK-22752: Summary: Add robust default state configuration to StateFun Key: FLINK-22752 URL: https://issues.apache.org/jira/browse/FLINK-22752 Project: Flink Issue Type: Sub-task Reporter: Stephan Ewen We aim to reduce the state configuration complexity by applying a default configuration with robust settings, based on lessons learned in Flink. *(1) Always use RocksDB.* _That is already the case._ We keep this for now, as long as the only other alternative are backends with Objects on the heap, which are tricky in terms of predictable JVM performance. RocksDB has a significant performance cost, but more robust behavior. *(2) Activate local recovery by default.* That makes recovery cheao for soft tasks failures and gracefully cancelled tasks. We need to set these options: - {{state.backend.local-recovery: true}} - {{taskmanager.state.local.root-dirs: }} - some local directory that will not possibly be wiped by the OS periodically, so typically some local directory that is not {{/tmp}}, for example {{/local/state/recovery}}. - {{state.backend.rocksdb.localdir: }} - a directory on the same FS / device as above, so that one can create hard links between them (required for RocksDB local checkpoints), for example {{/local/state/rocksdb}}. Flink will most likely adopt this as a default setting as well in the future. It still makes sense to pre-configer a different RocksDB working directory than {{/tmp}}. *(3) Activate partitioned indexes by default.* This may cost minimal performance in some cases, but can avoid massive performance regression in cases where the index blocks no longer fit into the memory cache (may happen more frequently when there are too many ColumnFamilies = states). Set {{state.backend.rocksdb.memory.partitioned-index-filters: true}}. See FLINK-20496 for details. *(4) Increase number of transfer threads by default.* This speeds up state recovery in many cases. The default value in Flink is a bit conservative, to avoid spamming DFS (like HDFS) by default. The more cloud-centric StateFun setups should be safe to use higher default value. Set {{state.backend.rocksdb.checkpoint.transfer.thread.num: 8}}. *(5) Increase RocksDB compaction threads by default.* The number of RocksDB compaction threads is frequently a bottleneck. Increasing it costs virtually nothing and mitigates that bottleneck in most cases. {{state.backend.rocksdb.thread.num: 4}} (this value is chosen under the assumption that there is only one slot). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22751) Reduce JVM Metaspace memory for StateFun
Stephan Ewen created FLINK-22751: Summary: Reduce JVM Metaspace memory for StateFun Key: FLINK-22751 URL: https://issues.apache.org/jira/browse/FLINK-22751 Project: Flink Issue Type: Sub-task Components: Stateful Functions Reporter: Stephan Ewen Flink by default reserves quite a lot of metaspace memory (256 MB) out of the total memory budget, to accommodate applications that load (and reload) a lot of classes. That is necessary because user code is executed directly in the JVM. StateFun by default doesn't execute code in the JVM, so it needs much less Metaspace memory. I would suggest to reduce the Metaspace size to something like 64MB or 96MB by default. {{taskmanager.memory.jvm-metaspace.size: 64m}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22749) Apply a robust default State Backend Configuration
Stephan Ewen created FLINK-22749: Summary: Apply a robust default State Backend Configuration Key: FLINK-22749 URL: https://issues.apache.org/jira/browse/FLINK-22749 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Stephan Ewen We should update the default state backend configuration with default settings that reflect lessons-learned about robust setups. (1) Always use the RocksDB State Backend. That is already the case. (2) Active Partitioned Index filters by default. This may cost some overhead in specific cases, but helps with massive performance regressions when we have too many ColumnFamilies (too many states) such that the cache can no longer hold all index files. We need to add {{state.backend.rocksdb.memory.partitioned-index-filters: true}} to the config. See FLINK-20496 for details. There is a chance that Flink makes this the default in the future as well, then we could remove it again from the StateFun setup. (3) Activate local recovery by default. That should speed up the recovery of all non-hard-crashed TMs by a lot. We need to configure - {{state.backend.local-recovery: true}} - {{taskmanager.state.local.root-dirs}} to some non-temp directory For this to work reliably, we need a local directory that is not periodically wiped by the OS, so we should not rely on the default ({{/tmp}} directory, but set up a dedicated non-temp state directory. Flink will probably make this the default in the future, but having a non-{{/tmp}} directory for the RocksDB and local snapshots makes still a lot of sense. (4) Increase state transfer threads by default, to speed up state restores. Add to the config: {{state.backend.rocksdb.checkpoint.transfer.thread.num: 8}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22741) Hide Flink complexity from Stateful Functions
Stephan Ewen created FLINK-22741: Summary: Hide Flink complexity from Stateful Functions Key: FLINK-22741 URL: https://issues.apache.org/jira/browse/FLINK-22741 Project: Flink Issue Type: New Feature Components: Stateful Functions Affects Versions: statefun-3.0.0 Reporter: Stephan Ewen This is an umbrella issue for various issues to hide and reduce the complexity and surface area (and configuration space) of Apache Flink when using Stateful Functions. The goal of this is to create a setup and configuration that works robustly in the vast majority of settings. Users should not be required to configure anything Flink-specific, other than If this happens at the cost of some minor regression in peak stream throughput, we can most likely stomach that in StateFun, because the performance cost is commonly dominated by the interaction between StateFun cluster and remote function deployments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22729) Truncated Messages in Python workers
Stephan Ewen created FLINK-22729: Summary: Truncated Messages in Python workers Key: FLINK-22729 URL: https://issues.apache.org/jira/browse/FLINK-22729 Project: Flink Issue Type: Bug Components: Stateful Functions Affects Versions: statefun-2.2.2 Environment: The Stateful Function version is 2.2.2, java8. The Java App as well as the external Python workers are deployed in the same kubernetes cluster. Reporter: Stephan Ewen Fix For: statefun-3.1.0 Recently we started seeing the following faulty behavior in the Flink Stateful Functions HTTP communication towards external Python workers. This is only occurring when the system is under heavy load. The Java Application will send HTTP Messages to an external Python Function but the external Function fails to parse the message with a "Truncated Message Error". Printouts show that the truncated message looks as follows: {code} my.protobuf.MyClass: my.protobuf.MyClass: my.protobuf.MyClass: my.protobuf.MyClass: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefun-Truncated-Messages-in-Python-workers-td43831.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22433) CoordinatorEventsExactlyOnceITCase stalls on Adaptive Scheduler
Stephan Ewen created FLINK-22433: Summary: CoordinatorEventsExactlyOnceITCase stalls on Adaptive Scheduler Key: FLINK-22433 URL: https://issues.apache.org/jira/browse/FLINK-22433 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.13.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0, 1.13.1 Logs of the test failure: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17077=logs=0e7be18f-84f2-53f0-a32d-4a5e4a174679=7030a106-e977-5851-a05e-535de648c9c9=3 Steps to reproduce: Adjust the {{CoordinatorEventsExactlyOnceITCase }} and extend the MiniCluster configuration: {code} @BeforeClass public static void startMiniCluster() throws Exception { final Configuration config = new Configuration(); config.setString(RestOptions.BIND_PORT, "0"); config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); config.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22406) ReactiveModeITCase.testScaleDownOnTaskManagerLoss()
Stephan Ewen created FLINK-22406: Summary: ReactiveModeITCase.testScaleDownOnTaskManagerLoss() Key: FLINK-22406 URL: https://issues.apache.org/jira/browse/FLINK-22406 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.13.0 Reporter: Stephan Ewen The test is stalling on Azure CI. https://dev.azure.com/sewen0794/Flink/_build/results?buildId=292=logs=0a15d512-44ac-5ba5-97ab-13a5d066c22c=634cd701-c189-5dff-24cb-606ed884db87=4865 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22358) Add missing stability annotation to Split Reader API classes
Stephan Ewen created FLINK-22358: Summary: Add missing stability annotation to Split Reader API classes Key: FLINK-22358 URL: https://issues.apache.org/jira/browse/FLINK-22358 Project: Flink Issue Type: Task Components: Connectors / Common Reporter: Stephan Ewen Fix For: 1.14.0, 1.13.1 The Split Reader API currently has no stability annotations, it is unclear which classes are public API, which are internal, which are stable, and which are evolving. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22357) Mark FLIP-27 Source API as stable
Stephan Ewen created FLINK-22357: Summary: Mark FLIP-27 Source API as stable Key: FLINK-22357 URL: https://issues.apache.org/jira/browse/FLINK-22357 Project: Flink Issue Type: Task Components: API / Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.14.0 The FLIP-27 source API was properly introduced in 1.11, has undergone some major improvements in 1.12. During the stabilization in 1.13 we needed only one very minor change to those interfaces. I think it is time to declare the core source API interfaces as stable, to allow users to safely rely on them. I would suggest to do that for 1.14, possibly even backport the annotation change to 1.13. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22324) Backport FLINK-18071 for 1.12.x
Stephan Ewen created FLINK-22324: Summary: Backport FLINK-18071 for 1.12.x Key: FLINK-22324 URL: https://issues.apache.org/jira/browse/FLINK-22324 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.3 See FLINK-18071 - this issue only tracks the backport to allow closing the blocker issue for 1.13.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22093) Unstable test "ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling"
Stephan Ewen created FLINK-22093: Summary: Unstable test "ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling" Key: FLINK-22093 URL: https://issues.apache.org/jira/browse/FLINK-22093 Project: Flink Issue Type: Bug Components: Tests Reporter: Stephan Ewen Fix For: 1.13.0 The test {{ThreadInfoSampleServiceTest.testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling()}} failed in all profiles in my latest CI build (even though it passes locally). - https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267=ab910030-93db-52a7-74a3-34a0addb481b=8102 - https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=6e55a443-5252-5db5-c632-109baf464772=9df6efca-61d0-513a-97ad-edb76d85786a=6698 - https://dev.azure.com/sewen0794/Flink/_build/results?buildId=250=logs=cc649950-03e9-5fae-8326-2f1ad744b536=51cab6ca-669f-5dc0-221d-1e4f7dc4fc85 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22069) Check Log Pollution for 1.13 release
Stephan Ewen created FLINK-22069: Summary: Check Log Pollution for 1.13 release Key: FLINK-22069 URL: https://issues.apache.org/jira/browse/FLINK-22069 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Stephan Ewen Fix For: 1.13.0 We should check for log pollution and confusing log lines before the release. Below are some lines I stumbled over while using Flink during testing. - These lines show up on any execution of a local job and make me think I forgot to configure something I probably should have, wondering whether this might cause problems later? These have been in Flink for a few releases now, might be worth rephrasing, though. {code} 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 2021-03-30 17:57:22,483 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. {code} - These lines show up on every job start, even if there is no recovery but just a plain job start. They are not particularly problematic, but also not helping. {code} 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,839 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) 2021-03-30 17:57:27,855 INFO org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate [] - Converting recovered input channels (8 channels) {code} When using {{DataStream.collect()}} we always have an excpetion in the log for the first fetch attempt, before the JM is ready. The loop retries and the program succeeds, but the exception in the log raises confusion about whether there is a swallowed but impactful error. {code} 7199 [main] WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An exception occurs when fetching query results java.util.concurrent.ExecutionException: org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:155) ~[classes/:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126) [classes/:?] at
[jira] [Created] (FLINK-21996) Transient RPC failure without TaskManager failure can lead to split assignment loss
Stephan Ewen created FLINK-21996: Summary: Transient RPC failure without TaskManager failure can lead to split assignment loss Key: FLINK-21996 URL: https://issues.apache.org/jira/browse/FLINK-21996 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.12.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.13.0 NOTE: This bug has not been actually observed. It is based on reviews of the current implementation. I would expect it to be a pretty rare case, bu at scale, even the rare cases happen often enough. h2. Problem Intermediate RPC messages from JM to TM can get dropped, even when the TM is not marked as failed. That can happen when the connection can be recovered before the heartbeat times out. So RPCs generally retry, or handle failures: For example Deploy-Task-RPC retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers a new checkpoint. The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a Future with the acknowledgement. But if that one fails, we are in the situation where we do not know whether the event sending was successful or not (only the ack failed). This is especially tricky for split assignments and checkpoints. Consider this sequence of actions: 1. Coordinator assigns a split. Ack not yet received. 2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so is not included on the Coordinator. 3. Split assignment RPC response is "failed". 4. Checkpoint completes. Now we don't know whether the split was in the checkpoint on the Operator (TaskManager) or not, and with that we don't know whether we should add it back to the coordinator. We need to do something to make sure the split is now either on the coordinator or on the Operator. Currently, the split is implicitly assumed to be on the Operator; if it isn't, then that split is lost. Not, it is worth pointing out that this is a pretty rare situation, because it means that the RPC with the split assignment fails and the one for the checkpoint succeeds, even though they are in close proximity. The way the Akka-based RPC transport works (with retries, etc.), this can happen, but isn't very likely. That why we haven't so far seen this bug in practice or haven't gotten a report for it, yet. h2. Proposed solution The solution has two components: 1. Fallback to consistent point: If the system doesn't know whether two parts are still consistent with each other (here coordinator and Operator), fall back to a consistent point. Here that is the case when the Ack-Future for the "Send Operator Event" RPC fails or times out. Then we call the scheduler to trigger a failover of the target operator to latest checkpoint and signaling the coordinator the same. That restores consistency. We can later optimize this (see below). 2. We cannot trigger checkpoints while we are "in limbo" concerning our knowledge about splits. Concretely that means that the Coordinator can only acknowledge the checkpoint once the Acks for pending Operator Event RPCs (Assign-Splits) have arrived. The checkpoint future is conditional on all pending RPC futures. If the RPC futures fail (or time out) then the checkpoint cannot complete (and the target operator will anyways go through a failover). In the common case, RPC round trip time is milliseconds, which would be added to the checkpoint latency if the checkpoint happends to overlap with a split assignment (most won't). h2. Possible Future Improvements Step (1) above can be optimized by going with retries first and sequence numbers to deduplicate the calls. That can help reduce the number of cases were a failover is needed. However, the number of situations where the RPC would need a retry and has a chance of succeeding (the TM is not down) should be very few to begin with, so whether this optimization is worth it remains to be seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21935) Remove "state.backend.async" option.
Stephan Ewen created FLINK-21935: Summary: Remove "state.backend.async" option. Key: FLINK-21935 URL: https://issues.apache.org/jira/browse/FLINK-21935 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Stephan Ewen Fix For: 1.13.0 Checkpoints are always asynchronous, there is no case ever for a synchronous checkpoint. The RocksDB state backend doesn't even support synchronous snapshots, and the HashMap Heap backend also has no good use case for synchronous snapshots (other than a very minor reduction in heap objects). Most importantly, we should not expose this option in the constructors of the new state backend API classes, like {{HashMapStateBackend}}. I marked this a blocker because it is part of the new user-facing State Backend API and I would like to avoid that this option enters this API and causes confusion when we eventually remove it. /cc [~sjwiesman] and [~liyu] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21695) Increase default value for number of KeyGroups
Stephan Ewen created FLINK-21695: Summary: Increase default value for number of KeyGroups Key: FLINK-21695 URL: https://issues.apache.org/jira/browse/FLINK-21695 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / State Backends Reporter: Stephan Ewen Fix For: 1.13.0 The current calculation for the number of Key Groups (max parallelism) leads in many cases to data skew and to confusion among users. Specifically, the fact that for maxParallelisms above 128, the default value is set to {{roundToPowerOfTwo(1.5 x parallelism)}} means that frequently, half of the tasks get one keygroup and the other half gets two keygroups, which is very skewed. See section (1) in this "lessons learned" blog post. https://engineering.contentsquare.com/2021/ten-flink-gotchas/ We can fix this by - either setting a default maxParallelism to something pretty high (2048 for example). The cost is that we add the default key group overhead per state entry from one byte to two bytes. - or we stay with some similar logic, but we instead of {{1.5 x operatorParallelism}} we go with some higher multiplier, like {{4 x operatorParallelism}}. The price is again that we more quickly reach the point where we have two bytes of keygroup encoding overhead, instead of one. Implementation wise, there is an unfortunate situation that the maxParallelism, if not configured, is not stored anywhere in the job graph, but re-derived on the JobManager each time it loads a JobGraph vertex (ExecutionJobVertex) which does not have a MaxParallelism configured. This relies on the implicit contract that this logic never changes. Changing this logic will instantly break all jobs which have not explicitly configured the Max Parallelism. That seems like a pretty heavy design shortcoming, unfortunately :-( A way to partially work around that is by moving the logic that derives the maximum parallelism to the {{StreamGraphGenerator}}, so we never create JobGraphs where vertices have no configured Max Parallelism (and we keep the re-derivation logic for backwards compatibility for persisted JobGraphs). The {{StreamExecutionEnvironment}} will need a flag to use the "old mode" to give existing un-configured applications a way to keep restoring from old savepoints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21694) Increase default value of "state.backend.rocksdb.checkpoint.transfer.thread.num"
Stephan Ewen created FLINK-21694: Summary: Increase default value of "state.backend.rocksdb.checkpoint.transfer.thread.num" Key: FLINK-21694 URL: https://issues.apache.org/jira/browse/FLINK-21694 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Reporter: Stephan Ewen Fix For: 1.13.0 The default value for the number of threads used to download state artifacts from checkpoint storage should be increased. The increase should not pose risk of regression, but does in many cases speed up checkpoint recovery significantly. Something similar was reported in this blog post, item (3). https://engineering.contentsquare.com/2021/ten-flink-gotchas/ A default value of 8 (eight) sounds like a good default. It should not result in excessive thread explosion, and already speeds up recovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20472) Change quickstarts to have a dependency on "flink-dist"
Stephan Ewen created FLINK-20472: Summary: Change quickstarts to have a dependency on "flink-dist" Key: FLINK-20472 URL: https://issues.apache.org/jira/browse/FLINK-20472 Project: Flink Issue Type: Improvement Components: Quickstarts Reporter: Stephan Ewen I suggest we change the quickstarts to have the following dependencies. - {{flink-dist}} (provided) - log4j (runtime) That way the projects created form quickstarts have exactly the same dependencies as what the distribution provides. That solves all our mismatches between needing different dependencies for compiling/running-in-IDE and packaging for deployment. For example, we can add the {{flink-connector-base}} and {{flink-connector-files}} back to {{flink-dist}} without having any issues with setting up dependencies in the user projects. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20413) Sources should add splits back in "resetSubtask()", rather than in "subtaskFailed()".
Stephan Ewen created FLINK-20413: Summary: Sources should add splits back in "resetSubtask()", rather than in "subtaskFailed()". Key: FLINK-20413 URL: https://issues.apache.org/jira/browse/FLINK-20413 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Because "subtaskFailed()" has no strong order guarantees with checkpoint completion, we need to return failed splits in "resetSubtask()" instead. See FLINK-20396 for a detailed explanation of the race condition. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20412) Collect Result Fetching occasionally fails after a JobManager Failover
Stephan Ewen created FLINK-20412: Summary: Collect Result Fetching occasionally fails after a JobManager Failover Key: FLINK-20412 URL: https://issues.apache.org/jira/browse/FLINK-20412 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Stephan Ewen The encountered exception is as blow. The issue can be reproduced by running a test with JobManager failover in a tight loop, for example the FileTextLinesITCase from this PR: [https://github.com/apache/flink/pull/14199] {code:java} 15335 [main] WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher - An exception occurs when fetching query results java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163) ~[classes/:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:134) [classes/:?] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103) [classes/:?] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77) [classes/:?] at org.apache.flink.streaming.api.datastream.DataStreamUtils.collectRecordsFromUnboundedStream(DataStreamUtils.java:142) [classes/:?] at org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSource(FileSourceTextLinesITCase.java:272) [test-classes/:?] at org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSourceWithJobManagerFailover(FileSourceTextLinesITCase.java:228) [test-classes/:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) [junit-4.12.jar:4.12] at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.12.jar:4.12] at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.12.jar:4.12] at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) [junit-4.12.jar:4.12] at org.junit.rules.RunRules.evaluate(RunRules.java:20) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) [junit-4.12.jar:4.12] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) [junit-4.12.jar:4.12] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.12.jar:4.12] at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) [junit-4.12.jar:4.12] at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) [junit-4.12.jar:4.12] at org.junit.rules.RunRules.evaluate(RunRules.java:20) [junit-4.12.jar:4.12] at org.junit.runners.ParentRunner.run(ParentRunner.java:363) [junit-4.12.jar:4.12] at org.junit.runner.JUnitCore.run(JUnitCore.java:137) [junit-4.12.jar:4.12] at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) [junit-rt.jar:?] at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:67) [junit-rt.jar:?] at
[jira] [Created] (FLINK-20406) Return the Checkpoint ID of the restored Checkpoint in CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks()
Stephan Ewen created FLINK-20406: Summary: Return the Checkpoint ID of the restored Checkpoint in CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks() Key: FLINK-20406 URL: https://issues.apache.org/jira/browse/FLINK-20406 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 To allow the scheduler to notify Operator Coordinators of subtask restores (local failover), we need to know which checkpoint ID was restored. This change does not adjust the other restore methods of the Checkpoint Coordinator, because the fact that the Scheduler needs to be involved in the subtask restore notification at all is only due to a shortcoming of the Checkpoint Coordinator: The CC is not aware of subtask restores, it always restores all subtasks and relies on the fact that assigning state to a running execution attempt has no effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20397) Pass checkpointId to OperatorCoordinator.resetToCheckpoint().
Stephan Ewen created FLINK-20397: Summary: Pass checkpointId to OperatorCoordinator.resetToCheckpoint(). Key: FLINK-20397 URL: https://issues.apache.org/jira/browse/FLINK-20397 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.11.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0, 1.11.3 The OperatorCoordinator.resetToCheckpoint() currently lacks the information which checkpoint it recovers to. That forces implementers to assume strict ordering of method calls between restore and failure. While that is currently guaranteed in this case, it is not guaranteed in other places (see parent issue). Because of that, we want implementations to not assume method order at all, but rely on explicit information passed to the methods (checkpoint IDs). Otherwise we end up with mixed implementations that partially infer context from the order of method calls, and partially use explicit information that was passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20396) Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()"
Stephan Ewen created FLINK-20396: Summary: Replace "OperatorCoordinator.subtaskFailed()" with "subtaskRestored()" Key: FLINK-20396 URL: https://issues.apache.org/jira/browse/FLINK-20396 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.11.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0, 1.11.3 There are no strong order guarantees between {{OperatorCoordinator.subtaskFailed()}} and {{OperatorCoordinator.notifyCheckpointComplete()}}. It can happen that a checkpoint completes after the notification for task failure is sent: - {{OperatorCoordinator.checkpoint()}} - {{OperatorCoordinator.subtaskFailed()}} - {{OperatorCoordinator.checkpointComplete()}} The subtask failure here does not know whether the previous checkpoint completed or not. It cannot decide what state the subtask will be in after recovery. There is no easy fix right now to strictly guarantee the order of the method calls, so alternatively we need to provide the necessary information to reason about the status of tasks. We should replace {{OperatorCoordinator.subtaskFailed(int subtask)}} with {{OperatorCoordinator.subtaskRestored(int subtask, long checkpoint)}}. That implementations get the explicit checkpoint ID for the subtask recovery, and can align that with the IDs of checkpoints that were taken. It is still (in rare cases) possible that for a specific checkpoint C, {{OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before {{OperatorCoordinator.checkpointComplete(C)}}. h3. Background The Checkpointing Procedure is partially asynchronous on the {{JobManager}} / {{CheckpointCoordinator}}: After all subtasks acknowledged the checkpoint, the finalization (writing out metadata and registering the checkpoint in ZooKeeper) happens in an I/O thread, and the checkpoint completes after that. This sequence of events can happen: - tasks acks checkpoint - checkpoint fully acknowledged, finalization starts - task fails - task failure notification is dispatched - checkpoint completes. For task failures and checkpoint completion, no order is defined. However, for task restore and checkpoint completion, the order is well defined: When a task is restored, pending checkpoints are either canceled or complete. None can be within finalization. That is currently guaranteed with a lock in the {{CheckpointCoordinator}}. (An implication of that being that restores can be blocking operations in the scheduler, which is not ideal from the perspective of making the scheduler async/non-blocking, but it is currently essential for correctness). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20379) New Kafka Connector does not support DeserializationSchema
Stephan Ewen created FLINK-20379: Summary: New Kafka Connector does not support DeserializationSchema Key: FLINK-20379 URL: https://issues.apache.org/jira/browse/FLINK-20379 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Stephan Ewen Fix For: 1.12.0 The new Kafka Connector defines its own deserialization schema and is incompatible with the existing library of deserializers. That means that users cannot use all of Flink's Formats (Avro, JSON, Csv, Protobuf, Confluent Schema Registry, ...) with the new Kafka Connector. I think we should change the new Kafka Connector to use the existing Deserialization classes, so all formats can be used, and users can reuse their deserializer implementations. It would also be good to use the existing KafkaDeserializationSchema. Otherwise all users need to migrate their sources again. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20276) Transparent DeCompression of streams missing on new File Source
Stephan Ewen created FLINK-20276: Summary: Transparent DeCompression of streams missing on new File Source Key: FLINK-20276 URL: https://issues.apache.org/jira/browse/FLINK-20276 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The existing {{FileInputFormat}} applies decompression (gzip, xy, ...) automatically on the file input stream, based on the file extension. We need to add similar functionality for the {{StreamRecordFormat}} of the new FileSource to be on par with this functionality. This can be easily applied in the {{StreamFormatAdapter}} when opening the file stream. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20188) Add Documentation for new File Source
Stephan Ewen created FLINK-20188: Summary: Add Documentation for new File Source Key: FLINK-20188 URL: https://issues.apache.org/jira/browse/FLINK-20188 Project: Flink Issue Type: Task Components: Documentation Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20063) File Source requests an additional split on every restore.
Stephan Ewen created FLINK-20063: Summary: File Source requests an additional split on every restore. Key: FLINK-20063 URL: https://issues.apache.org/jira/browse/FLINK-20063 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Currently, the {{FileSourceReader}} requests a new split when started. That includes cases when it was restored from a checkpoint. So with every restore, the reader increases its split backlog size by one, causing problems for balanced split assignments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20049) Simplify handling of "request split".
Stephan Ewen created FLINK-20049: Summary: Simplify handling of "request split". Key: FLINK-20049 URL: https://issues.apache.org/jira/browse/FLINK-20049 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 This issue is similar to FLINK-19265 Split assignment events are treated specially by the source API. Users do not create them directly but call methods on the contexts to assign splits. The {{RequestSplitEvent}} is in contrast to that a custom user event and needs to be handled like a custom event, when sent by enumerators and received by the readers. That seems a bit confusing and inconsistent, given that {{RequestSplitEvent}} is essential for all use cases with pull-based split assignment, which is pretty much any batch use case and various streaming use cases. The event should be on the same level as the AddSplitEvent. I suggest that we add a {{SourceReaderContext.requestSplit()}} and {{SplitEnumerator.handleSplitRequest()}}, to have split requests and responses symmetrical. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20028) FileCompactionITCase is unstable
Stephan Ewen created FLINK-20028: Summary: FileCompactionITCase is unstable Key: FLINK-20028 URL: https://issues.apache.org/jira/browse/FLINK-20028 Project: Flink Issue Type: Bug Components: Connectors / FileSystem, Table SQL / Ecosystem Reporter: Stephan Ewen Fix For: 1.12.0 The {{ParquetFileCompactionITCase}} hangs and times out. Log: https://dev.azure.com/sewen0794/Flink/_build/results?buildId=178=logs=66592496-52df-56bb-d03e-37509e1d9d0f=ae0269db-6796-5583-2e5f-d84757d711aa Exception: {code} org.junit.runners.model.TestTimedOutException: test timed out after 60 seconds at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119) at org.apache.flink.table.pi.internal.TableResultImpl.await(TableResultImpl.java:86) at org.apache.flink.table.planner.runtime.stream.sql.FileCompactionITCaseBase.testNonPartition(FileCompactionITCaseBase.java:91) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20008) Java Deadlock in ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement()
Stephan Ewen created FLINK-20008: Summary: Java Deadlock in ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement() Key: FLINK-20008 URL: https://issues.apache.org/jira/browse/FLINK-20008 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Stephan Ewen Fix For: 1.12.0 The stack trace detects a deadlock between the testing thread and the curator event thread. Full log: https://dev.azure.com/sewen0794/Flink/_build/results?buildId=176=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374 Relevant Stack Trace: {code} Found one Java-level deadlock: = "main-EventThread": waiting to lock monitor 0x7f74c00045e8 (object 0x8ed14cb0, a java.lang.Object), which is held by "main" "main": waiting to lock monitor 0x7f74e401a1f8 (object 0x8ed15008, a org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch), which is held by "main-EventThread" Java stack information for the threads listed above: === "main-EventThread": at org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:186) - waiting to lock <0x8ed14cb0> (a java.lang.Object) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:158) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$9.apply(LeaderLatch.java:693) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$9.apply(LeaderLatch.java:689) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100) at org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:688) - locked <0x8ed15008> (a org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.checkLeadership(LeaderLatch.java:567) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.access$700(LeaderLatch.java:65) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch$7.processResult(LeaderLatch.java:618) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:883) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:653) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.GetChildrenBuilderImpl$2.processResult(GetChildrenBuilderImpl.java:187) at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:601) at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:508) "main": at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.close(LeaderLatch.java:203) - waiting to lock <0x8ed15008> (a org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch) at org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch.close(LeaderLatch.java:190) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.close(ZooKeeperLeaderElectionDriver.java:140) at org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.stop(DefaultLeaderElectionService.java:103) - locked <0x8ed14cb0> (a java.lang.Object) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement(ZooKeeperLeaderElectionTest.java:310) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19804) Make FileSource class generic with respect to split types
Stephan Ewen created FLINK-19804: Summary: Make FileSource class generic with respect to split types Key: FLINK-19804 URL: https://issues.apache.org/jira/browse/FLINK-19804 Project: Flink Issue Type: Sub-task Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 To support sources with different split types, the FileSource class should become generic with respect to split types. To not create too complex generic signatures for users, we should add a new class {{AbstractFileSource}} which is generic and has all the shared generic methods. The current {{FileSource}} would be changed to extend this class for the specific split type {{FileSourceSplit}}, and thus behave exactly like it currently does. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19803) Make File Source Checkpoint Serializer extensible
Stephan Ewen created FLINK-19803: Summary: Make File Source Checkpoint Serializer extensible Key: FLINK-19803 URL: https://issues.apache.org/jira/browse/FLINK-19803 Project: Flink Issue Type: Sub-task Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The checkpoint type serializer should be generic and accept the split serializer, rather than being hard-wired to the {{FileSourceSplitSerializer}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19802) Let BulkFormat createReader and restoreReader methods accept Splits directly
Stephan Ewen created FLINK-19802: Summary: Let BulkFormat createReader and restoreReader methods accept Splits directly Key: FLINK-19802 URL: https://issues.apache.org/jira/browse/FLINK-19802 Project: Flink Issue Type: Sub-task Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 To support sources where the splits communicate additional information, the BulkFormats should accept a generic split type, instead of path/offset/length from the splits directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19800) Make FileSourceSplit / FileSourceSplitState interaction extensible.
Stephan Ewen created FLINK-19800: Summary: Make FileSourceSplit / FileSourceSplitState interaction extensible. Key: FLINK-19800 URL: https://issues.apache.org/jira/browse/FLINK-19800 Project: Flink Issue Type: Sub-task Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 We should add a method {{FileSourceSplit.updateWithCheckpointedPosition(CheckpointedPosition)}} that creates a new {{FileSourceSplit}} of the same type with an updated position. Subclasses can override that method. The {{FileSourceSplitState}} uses that method to create the splits that are stored in the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19799) Make FileSource extensible
Stephan Ewen created FLINK-19799: Summary: Make FileSource extensible Key: FLINK-19799 URL: https://issues.apache.org/jira/browse/FLINK-19799 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The File System Source currently assumes all formats can represent their work units as {{FileSourceSplit}}. If that is not the case, the formats cannot be implemented using the {{FileSource}}. We need to support extending the splits to carry additional information in the splits, and to use that information when creating bulk readers and handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19516) PerJobMiniClusterFactoryTest. testJobClient()
Stephan Ewen created FLINK-19516: Summary: PerJobMiniClusterFactoryTest. testJobClient() Key: FLINK-19516 URL: https://issues.apache.org/jira/browse/FLINK-19516 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Stephan Ewen *Log:* https://dev.azure.com/sewen0794/19b23adf-d190-4fb4-ae6e-2e92b08923a3/_apis/build/builds/151/logs/137 *Exception:* {code} [ERROR] testJobClient(org.apache.flink.client.program.PerJobMiniClusterFactoryTest) Time elapsed: 0.392 s <<< FAILURE! java.lang.AssertionError: Expected: is but: was at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) at org.apache.flink.client.program.PerJobMiniClusterFactoryTest.assertThatMiniClusterIsShutdown(PerJobMiniClusterFactoryTest.java:161) at org.apache.flink.client.program.PerJobMiniClusterFactoryTest.testJobClient(PerJobMiniClusterFactoryTest.java:93) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19514) ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange times out
Stephan Ewen created FLINK-19514: Summary: ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange times out Key: FLINK-19514 URL: https://issues.apache.org/jira/browse/FLINK-19514 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Stephan Ewen Full logs: https://dev.azure.com/sewen0794/19b23adf-d190-4fb4-ae6e-2e92b08923a3/_apis/build/builds/148/logs/115 Exception: {code} [ERROR] testJobExecutionOnClusterWithLeaderChange(org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase) Time elapsed: 301.093 s <<< ERROR! java.util.concurrent.TimeoutException: Condition was not met in given timeout. at org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:132) at org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.getNextLeadingDispatcherGateway(ZooKeeperLeaderElectionITCase.java:140) at org.apache.flink.test.runtime.leaderelection.ZooKeeperLeaderElectionITCase.testJobExecutionOnClusterWithLeaderChange(ZooKeeperLeaderElectionITCase.java:122) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19499) Expose Metric Groups to Split Assigners
Stephan Ewen created FLINK-19499: Summary: Expose Metric Groups to Split Assigners Key: FLINK-19499 URL: https://issues.apache.org/jira/browse/FLINK-19499 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Split Assigners should have access to metric groups, so they can report metrics on assignment, like pending splits, local-, and remote assignments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19498) Port LocatableInputSplitAssigner to new File Source API
Stephan Ewen created FLINK-19498: Summary: Port LocatableInputSplitAssigner to new File Source API Key: FLINK-19498 URL: https://issues.apache.org/jira/browse/FLINK-19498 Project: Flink Issue Type: Task Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The new File Source API needs a locality aware input split assigner. To preserve the experience, I suggest to port the existing {{LocatableInputSplitAssigner}} from the {{InputFormat}} API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19494) Adjust "StreamExecutionEnvironment.generateSequence()" to new API Sources
Stephan Ewen created FLINK-19494: Summary: Adjust "StreamExecutionEnvironment.generateSequence()" to new API Sources Key: FLINK-19494 URL: https://issues.apache.org/jira/browse/FLINK-19494 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Stephan Ewen Fix For: 1.12.0 The utility method {{StreamExecutionEnvironmant.generateSequence(()}} should instantiate the new {{NumberSequenceSource}} rather than the existing {{StatefulSequenceSource}}. We should also deprecate the {{StatefulSequenceSource}} as part of this change, because - it is based on the legacy source API - it is not scalable, because it materializes the sequence -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19492) Consolidate Source Events between Source API and Split Reader API
Stephan Ewen created FLINK-19492: Summary: Consolidate Source Events between Source API and Split Reader API Key: FLINK-19492 URL: https://issues.apache.org/jira/browse/FLINK-19492 Project: Flink Issue Type: Improvement Components: API / Core, Connectors / Common Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The Source API (flink-core) and the SplitReader API (flink-connector-base) have currently separate copies of events to request splits and to signal that no splits are available. We should consolidate those in flink-core. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19457) Port NumberSequenceSource to FLIP-27 source interface
Stephan Ewen created FLINK-19457: Summary: Port NumberSequenceSource to FLIP-27 source interface Key: FLINK-19457 URL: https://issues.apache.org/jira/browse/FLINK-19457 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Both {{DataStream}} and {{DataSet}} APIs have a source generating a sequence of numbers. This is useful for debugging and testing. We should port this source to the FLIP-27 interface, to support testing programs with the new source API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19384) Source API exception signatures are inconsistent
Stephan Ewen created FLINK-19384: Summary: Source API exception signatures are inconsistent Key: FLINK-19384 URL: https://issues.apache.org/jira/browse/FLINK-19384 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.11.2, 1.11.1, 1.11.0 Reporter: Stephan Ewen Fix For: 1.12.0 The methods in {{org.apache.flink.api.connector.source.Source}} have inconsistent exception signatures: - the methods to create reader and enumerator do not throw a checked exception - the method to restore an enumerator throws an {{IOException}}. Either all methods should allow throwing checked IOExceptions or all methods should not allo any checked exceptions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19278) Bump Scala Macros Version to 2.1.1
Stephan Ewen created FLINK-19278: Summary: Bump Scala Macros Version to 2.1.1 Key: FLINK-19278 URL: https://issues.apache.org/jira/browse/FLINK-19278 Project: Flink Issue Type: Improvement Components: Build System Reporter: Stephan Ewen Fix For: 1.12.0 Scala Macros 2.1.0 does not support newer Scala versions, in particular not 2.12.12. Scala Macros 2.1.1 seems to support virtually all Scala 2.12 minor versions as well as the latest 2.11 versions (like 2.11.12). See here for version compatibility: https://mvnrepository.com/artifact/org.scalamacros/paradise -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19265) Simplify handling of 'NoMoreSplitsEvent'
Stephan Ewen created FLINK-19265: Summary: Simplify handling of 'NoMoreSplitsEvent' Key: FLINK-19265 URL: https://issues.apache.org/jira/browse/FLINK-19265 Project: Flink Issue Type: Improvement Components: Connectors / Common Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Split assignment events are treated specially by the source API. Users do not create them directly but call methods on the contexts to assign splits. The {{NoMoreSplitsEvent}} is in contrast to that a custom user event and needs to be handled like a custom event, when sent by enumerators and received by the readers. That seems a bit confusing and inconsistent, given that NoMoreSplits is essential for all bounded stream use cases and is on the same level as the {{AddSplitEvent}}. I suggest that we treat "no more splits" similarly, by having either a custom method or a custom "SplitAssignment" on the context and reader. [~jqin] Curious what would be your take on this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19251) Avoid confusing queue handling in "SplitReader.handleSplitsChanges()"
Stephan Ewen created FLINK-19251: Summary: Avoid confusing queue handling in "SplitReader.handleSplitsChanges()" Key: FLINK-19251 URL: https://issues.apache.org/jira/browse/FLINK-19251 Project: Flink Issue Type: Improvement Components: Connectors / Common Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Currently, the method {{SplitReader.handleSplitsChanges()}} is passed a queue of split changes to handle. The method may decide to handle only a subset of them and is passes later all remaining changes. In practice, this ends up being confusing and problematic: - It is important to remove the elements from the queue, not accidentally iterate, or the splits will get handles multiple times - If the queue is not left empty, the task to handle the changes is immediately re-enqueued. No other operation can happen before all split changes from the queue are handled. A simpler and more efficient contract would be to simply pass a list of split changes directly and once, for the fetcher to handle. For all implementations so far, this was sufficient and easier. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19250) SplitFetcherManager does not propagate errors correctly
Stephan Ewen created FLINK-19250: Summary: SplitFetcherManager does not propagate errors correctly Key: FLINK-19250 URL: https://issues.apache.org/jira/browse/FLINK-19250 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.11.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0, 1.11.3 The first exception that is reported does not lead to a notification, only successive exceptions do. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19245) Set default queue capacity for FLIP-27 source handover queue to 2
Stephan Ewen created FLINK-19245: Summary: Set default queue capacity for FLIP-27 source handover queue to 2 Key: FLINK-19245 URL: https://issues.apache.org/jira/browse/FLINK-19245 Project: Flink Issue Type: Improvement Components: Connectors / Common Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Based on initial investigation by [~jqin] a capacity value of two results in good queue throughput while still keeping the number of elements small. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19225) Improve code and logging in SourceReaderBase
Stephan Ewen created FLINK-19225: Summary: Improve code and logging in SourceReaderBase Key: FLINK-19225 URL: https://issues.apache.org/jira/browse/FLINK-19225 Project: Flink Issue Type: Improvement Components: Connectors / Common Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 An umbrella issue for minor improvements to the {{SourceReaderBase}}, such as logging, thread names, code simplifications. The concrete change is described in the messages of the commits tagged with this issue (separate commits to better track the changes). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19223) Simplify Availability Future Model in Base Connector
Stephan Ewen created FLINK-19223: Summary: Simplify Availability Future Model in Base Connector Key: FLINK-19223 URL: https://issues.apache.org/jira/browse/FLINK-19223 Project: Flink Issue Type: Improvement Components: Connectors / Common Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The current model implemented by the {{FutureNotifier}} and the {{SourceReaderBase}} has a shortcoming: - It does not support availability notifications where the notification comes before the check. IN that case the notification is lost. - One can see the added complexity created by this model also in the {{SourceReaderBase#isAvailable()}} where the returned future needs to be "post-processed" and eagerly completed if the reader is in fact available. This is based on queue size, which makes it hard to have other conditions. I think we can do something that is both easier and a bit more efficient by following a similar model as the {{org.apache.flink.runtime.io.AvailabilityProvider.AvailabilityHelper}}. Furthermore, I believe we can win more efficiency by integrating this better with the {{FutureCompletingBlockingQueue}}. I suggest to do a similar implementation as the {{AvailabilityHelper}} directly in the {{FutureCompletingBlockingQueue}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19221) Exploit LocatableFileStatus from Hadoop
Stephan Ewen created FLINK-19221: Summary: Exploit LocatableFileStatus from Hadoop Key: FLINK-19221 URL: https://issues.apache.org/jira/browse/FLINK-19221 Project: Flink Issue Type: Improvement Components: Connectors / Hadoop Compatibility Affects Versions: 1.11.1 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 When the HDFS Client returns a {{FileStatus}} (description of a file) it sometimes returns a {{LocatedFileStatus}} which already contains all the {{BlockLocation}} information. We should expose this on the Flink side, because it may save is a lot of RPC calls to the name node. The file enumerators often request block locations for all files, currently doing an RPC call for each file. When the FileStatus obtained from listing the directory (or getting details for a file) already has all the block locations, we can save the extra RPC call per file. The suggested implementation is as follows: 1. We introduce a {{LocatedInputSplit}} in Flink that we integrate with the built-in LocalFileSystem 2. We integrate this with the HadoopFileSystems by creating a Flink {{LocatedInputSplit}} whenever the underlying file system created a {{Hadoop LocatedInputSplit}} 3. As a safety net, the FS methods to access block information check whether the presented file status already contains the block information and return that information directly. Steps one and two are for simplification of FileSystem users (no need to ask for extra info if it is available). Step three is the transparent shortcut that all applications get even if they do not explicitly use the {{LocatedInputSplit}} and keep calling {{FileSystem.getBlockLocations()}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19218) Remove inconsistent host logic for LocalFileSystem
Stephan Ewen created FLINK-19218: Summary: Remove inconsistent host logic for LocalFileSystem Key: FLINK-19218 URL: https://issues.apache.org/jira/browse/FLINK-19218 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.11.1 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The {{LocalFileSystem}} returns file splits with a host information as returned via {code} InetAddress.getLocalHost().getHostName(); {code} This might be different, though, from the host name that the TaskManager is configured to use, which results in incorrect location matching if this information is used. It is also incorrect in cases where the file system is in fact not local, but a mounted NAS. Since this information is anyways not useful (there no good way to support locality-aware file access for the LocalFileSystem) I would suggest to remove this code. That would be better than having code in place that tries to suggest locality information that is frequently incorrect. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19205) SourceReaderContext should give access to Configuration and Hostbame
Stephan Ewen created FLINK-19205: Summary: SourceReaderContext should give access to Configuration and Hostbame Key: FLINK-19205 URL: https://issues.apache.org/jira/browse/FLINK-19205 Project: Flink Issue Type: Sub-task Components: API / Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 To implement end-to-end configurability, Source Readers need access to the Flink configuration. Source Readers sometimes need access to the hostname of the machine running the reader task, to add locality preferences to requests for partitions from the external source system. The natural way to give access to both is through the {{SourceReaderContext}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19162) Allow Split Reader based sources to reuse record batches
Stephan Ewen created FLINK-19162: Summary: Allow Split Reader based sources to reuse record batches Key: FLINK-19162 URL: https://issues.apache.org/jira/browse/FLINK-19162 Project: Flink Issue Type: Sub-task Components: Connectors / Common Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The Split Readers hand over a batch of records at a time from the I/O thread (fetching and decoding) to the main operator processing thread. These structures can memory intensive and expensive and performance greatly benefits from reusing them. This is especially true for high-performance format readers like ORC and Parquet. While previous sources (where I/O was in the main thread) could reuse objects in a trivial manner, the new Split Reader API (with multiple threads) needs an explicit {{recycle()}} hook to allow returning/reusing these objects. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19161) Port File Sources to FLIP-27 API
Stephan Ewen created FLINK-19161: Summary: Port File Sources to FLIP-27 API Key: FLINK-19161 URL: https://issues.apache.org/jira/browse/FLINK-19161 Project: Flink Issue Type: Sub-task Components: Connectors / FileSystem Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Porting the File sources to the FLIP-27 API means combining the - FileInputFormat from the DataSet Batch API - The Monitoring File Source from the DataStream API. The two currently share the same reader code already and partial enumeration code. *Structure* The new File Source will have three components: - File enumerators that discover the files. - File split assigners that decide which reader gets what split - File Reader Formats, which deal with the decoding. The main difference between the Bounded (Batch) version and the unbounded (Streaming) version is that the streaming version repeatedly invokes the file enumerator to search for new files. *Checkpointing Enumerators* The enumerators need to checkpoint the not-yet-assigned splits, plus, if they are in continuous discovery mode (streaming) the paths / timestamps already processed. *Checkpointing Readers* The new File Source needs to ensure that every reader can be checkpointed. Some readers may be able to expose the position in the input file that corresponds to the latest emitted record, but many will not be able to do that due to - storing compresses record batches - using buffered decoders where exact position information is not accessible We therefore suggest to expose a mechanism that combines seekable file offsets and records to read and skip after that offset. In the extreme cases, files can work only with seekable positions or only with records-to-skip. Some sources, like Avro, can have periodic seek points (sync markers) and count records-to-skip after these markers. *Efficient and Convenient Readers* To balance efficiency (batch vectorized reading of ORC / Parquet for vectorized query processing) and convenience (plug in 3-rd party CSV decoder over stream) we offer three abstraction for record readers - Bulk Formats that run over a file Path and return a iterable batch at a time _(most efficient)_ - File Record formats which read files record-by-record. The source framework hands over a pre-defined-size batch from Split Reader to Record Emitter. - Stream Formats that decode an input stream and rely on the source framework to decide how to batch record handover _(most convenient)_ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19087) ReaultPartitionWriter should not expose subpartition but only subpartition-readers
Stephan Ewen created FLINK-19087: Summary: ReaultPartitionWriter should not expose subpartition but only subpartition-readers Key: FLINK-19087 URL: https://issues.apache.org/jira/browse/FLINK-19087 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The {{ResultPartitionWiter}} currently gives arbitrary access to the sub-partitions. These subpartitions may not always exist directly, such as in a sort based shuffle. Necessary is only the access to a reader over a sub-partition's data (the ResultSubpartitionView). In the spirit of minimal scope of knowledge, the methods should be scoped to return readers, not the more general subpartitions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19047) Move unaligned checkpoint methods from ResultPartition to separate interface.
Stephan Ewen created FLINK-19047: Summary: Move unaligned checkpoint methods from ResultPartition to separate interface. Key: FLINK-19047 URL: https://issues.apache.org/jira/browse/FLINK-19047 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 All ResultPartitions have the unaligned checkpointing methods, because some do not support checkpoints and throw an {{UnsupportedOperationException}}. I suggest to follow the idea of interface segregation to put the methods relating to unaligned checkpoints in a dedicated interface. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19046) Introduce separate classes for PipelinedResultPartition and BoundedBlockingResultPartition
Stephan Ewen created FLINK-19046: Summary: Introduce separate classes for PipelinedResultPartition and BoundedBlockingResultPartition Key: FLINK-19046 URL: https://issues.apache.org/jira/browse/FLINK-19046 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Currently, the SubPartition classes are specific to the partition type (pipelined, batched/blocking) but the parent Partition class is shared. Given that the partitions behave differently regarding checkpoints, releasing, etc. the code is cleaner separated by introducing dedicated classes for the {{ResultPartitions}} based on the type. This is also an important preparation to later have more different implementations, like sort-based shuffles. Important: These new classes will not override any performance critical methods (like adding a buffer to the result). They merely specialize certain behaviors around checkpointing and cleanup. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19045) Remove obsolete option 'taskmanager.network.partition.force-release-on-consumption'
Stephan Ewen created FLINK-19045: Summary: Remove obsolete option 'taskmanager.network.partition.force-release-on-consumption' Key: FLINK-19045 URL: https://issues.apache.org/jira/browse/FLINK-19045 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Stephan Ewen Fix For: 1.12.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19024) Remove unused "releaseMemory" from ResultSubpartition
Stephan Ewen created FLINK-19024: Summary: Remove unused "releaseMemory" from ResultSubpartition Key: FLINK-19024 URL: https://issues.apache.org/jira/browse/FLINK-19024 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 The {{releaseMemory()}} call in the {{ResultSubpartition}} is currently not meaningful for any existing implementation. Future versions where memory may have to be released will quite possibly not implement that on a "subpartition" level. For example, a sort based shuffle has the buffers on a partition-level, rather than a subpartition level. We should thus remove the {{releaseMemory()}} call from the abstract subpartition interface. Concrete implementations can still release memory on a subpartition level, if needed in the future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19023) Remove pruning of Record Serializer Buffer
Stephan Ewen created FLINK-19023: Summary: Remove pruning of Record Serializer Buffer Key: FLINK-19023 URL: https://issues.apache.org/jira/browse/FLINK-19023 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Currently, the {{SpanningRecordSerializer}} prunes its internal serialization buffer under special circumstances: - The buffer becomes larger than a certain threshold (5MB) - The full record end lines up exactly with a full buffer length (this change got introduced at some point, it is not clear what the purpose is) This optimization virtually never kicks in (because of the second condition) and also seems unnecessary. There is only a single serializer on the sender side, so this will not help to reduce the maximum memory footprint needed in any way. NOTE: A similar optimization on the reader side ({{SpillingAdaptiveSpanningRecordDeserializer}}) makes sense, because multiple parallel deserializers run in order to piece together the records when retrieving buffers from the network in arbitrary order. Truncating buffers (or spilling) there helps reduce the maximum required memory footprint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19021) Cleanups of the ResultPartition components
Stephan Ewen created FLINK-19021: Summary: Cleanups of the ResultPartition components Key: FLINK-19021 URL: https://issues.apache.org/jira/browse/FLINK-19021 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 This is the umbrella issue for a set of simplifications and cleanups in the {{ResultPartition}} components. This cleanup is in preparation for a possible future refactoring. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18447) Add 'flink-connector-base' to 'flink-dist'
Stephan Ewen created FLINK-18447: Summary: Add 'flink-connector-base' to 'flink-dist' Key: FLINK-18447 URL: https://issues.apache.org/jira/browse/FLINK-18447 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 1.11.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.12.0 Source connectors will rely mostly on 'flink-connector-base'. It is like a high-level Source API. Including it in {{flink-dist}} would avoid that each connector has to package that into a fat-jar. It would then be used similarly to other APIs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18430) Upgrade stability to @Public for CheckpointedFunction and CheckpointListener
Stephan Ewen created FLINK-18430: Summary: Upgrade stability to @Public for CheckpointedFunction and CheckpointListener Key: FLINK-18430 URL: https://issues.apache.org/jira/browse/FLINK-18430 Project: Flink Issue Type: Improvement Components: API / DataStream Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The two interfaces {{CheckpointedFunction}} and {{CheckpointListener}} are used by many users, but are still (for years now) marked as {{@PublicEvolving}}. I think this is not correct. They are very core to the DataStream API and are used widely and should be treated as {{@Public}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18429) Add default method for CheckpointListener.notifyCheckpointAborted(checkpointId)
Stephan Ewen created FLINK-18429: Summary: Add default method for CheckpointListener.notifyCheckpointAborted(checkpointId) Key: FLINK-18429 URL: https://issues.apache.org/jira/browse/FLINK-18429 Project: Flink Issue Type: Bug Components: API / DataStream Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The {{CheckpointListener}} interface is implemented by many users. Adding a new method {{notifyCheckpointAborted(long)}} to the interface without a default method breaks many user programs. We should turn this method into a default method: - Avoid breaking programs - It is in practice less relevant for programs to react to checkpoints being aborted then to being completed. The reason is that on completion you often want to commit side-effects, while on abortion you frequently do not do anything, but let the next successful checkpoint commit all changes up to then. *Original Confusion* There was confusion about this originally, going back to a comment by myself suggesting this should not be a default method, incorrectly thinking of it as an internal interface: https://github.com/apache/flink/pull/8693#issuecomment-542834147 See also clarification email on the mailing list: {noformat} About the "notifyCheckpointAborted()": When I wrote that comment, I was (apparently wrongly) assuming we were talking about an internal interface here, because the "abort" signal was originally only intended to cancel the async part of state backend checkpoints. I just realized that this is exposed to users - and I am actually with Thomas on this one. The "CheckpointListener" is a very public interface that many users implement. The fact that it is tagged "@PublicEvolving" is somehow not aligned with reality. So adding the method here will in reality break lots and lots of user programs. I think also in practice it is much less relevant for user applications to react to aborted checkpoints. Since the notifications there can not be relied upon (if there is a task failure concurrently) users always have to follow the "newer checkpoint subsumes older checkpoint" contract, so the abort method is probably rarely relevant. This is something we should change, in my opinion. {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18307) Replace "slave" file name with "workers"
Stephan Ewen created FLINK-18307: Summary: Replace "slave" file name with "workers" Key: FLINK-18307 URL: https://issues.apache.org/jira/browse/FLINK-18307 Project: Flink Issue Type: Sub-task Components: Deployment / Scripts Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 See parent issue for a discussion of the rationale. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18124) Add documentation for new FLIP-27 source interface
Stephan Ewen created FLINK-18124: Summary: Add documentation for new FLIP-27 source interface Key: FLINK-18124 URL: https://issues.apache.org/jira/browse/FLINK-18124 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The documentation should be under {{Application Development / DataStream API / Data Sources }}. We need to add sections about - Data Source Concepts - Data Source API - Connector Base (Split Reader Library) - Event Time and Watermarks -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18091) Test Relocatable Savepoints
Stephan Ewen created FLINK-18091: Summary: Test Relocatable Savepoints Key: FLINK-18091 URL: https://issues.apache.org/jira/browse/FLINK-18091 Project: Flink Issue Type: Sub-task Components: Tests Reporter: Stephan Ewen Fix For: 1.11.0 The test should do the following: - take a savepoint. needs to make sure the job has enough state that there is more than just the "_metadata" file - copy it to another directory - start the job from that savepoint by addressing the metadata file and by addressing the savepoint directory We should also test that an incremental checkpoint that gets moved fails with a reasonable exception. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17950) Broken Scala env.countinuousSource method
Stephan Ewen created FLINK-17950: Summary: Broken Scala env.countinuousSource method Key: FLINK-17950 URL: https://issues.apache.org/jira/browse/FLINK-17950 Project: Flink Issue Type: Bug Components: API / Scala Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The Scala {{StreamExecutionEnvironment.countinuousSource(...)}} method has two critical problems: - Its return type is {{Unit}} instead of {{DataStream}}, so that no one can use the created stream - It does not forward the TypeInformation identified by the ScalaCompiler but relies on the Java TypeExtraction stack, which cannot handle most Scala types. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17906) Fix performance issues in WatermarkOutputMultiplexer
Stephan Ewen created FLINK-17906: Summary: Fix performance issues in WatermarkOutputMultiplexer Key: FLINK-17906 URL: https://issues.apache.org/jira/browse/FLINK-17906 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The WatermarkOutputMultiplexer has some potential for performance improvements: - not using volatile variables (all accesses are anyways from the mailbox of under the legacy checkpoint lock) - Using some boolean logic instead of branches -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17904) Add "scheduleWithFixedDelay" to ProcessingTimeService
Stephan Ewen created FLINK-17904: Summary: Add "scheduleWithFixedDelay" to ProcessingTimeService Key: FLINK-17904 URL: https://issues.apache.org/jira/browse/FLINK-17904 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Stephan Ewen Assignee: Stephan Ewen Adding {{"scheduleWithFixedDelay(...)"}} to {{ProcessingTimeService}} better support cases where fired timers are backed up. Rather than immediately firing again, they would wait their scheduled delay. The implementation can be added in ProcessingTimeService in the exact same way as {{"scheduleAtFixedRate"}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17903) Evolve WatermarkOutputMultiplexer to make it reusable in FLIP-27 Sources
Stephan Ewen created FLINK-17903: Summary: Evolve WatermarkOutputMultiplexer to make it reusable in FLIP-27 Sources Key: FLINK-17903 URL: https://issues.apache.org/jira/browse/FLINK-17903 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The {{WatermarkOutputMultiplexer}} merges multiple independently generated watermarks. To make it usable in the FLIP-27 sources, we need to - Change its IDs from integers to Strings (match splitIDs) - Support de-registration of local outputs (when splits are finished) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17899) Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources
Stephan Ewen created FLINK-17899: Summary: Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources Key: FLINK-17899 URL: https://issues.apache.org/jira/browse/FLINK-17899 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Stephan Ewen Assignee: Stephan Ewen *Preambel:* This whole discussion is to some extend only necessary, because in the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the {{pollNext(...)}} method. However, this design follows some deeper runtime pipeline design, and is not easy to change at this stage. There are some principle design choices here: *(1) Do we make Timestamps and Watermarks purely a feature of the library (ConnectorBase), or do we integrate it with the core (SourceOperator).* Making it purely a responsibility of the ConnectorBase would have the advantage of keeping the SourceOperator simple. However, there is value in integrating this with the SourceOperator. - Implementations that are not using the ConnectorBase (like simple collection- or iterator-based sources) would automatically get access to the plug-able TimestampExtractors and WatermarkGenerators. - When executing batch programs, the SourceOperator can transparently inject a "no-op" WatermarkGenerator so make sure no Watermarks are generated during the batch execution. Given that batch sources are very performance sensitive, it seems useful to not even run the watermark generator logic, rather than later dropping the watermarks. - In a future version, we may want to implement "global watermark holds" generated my the Enumerators: The enumerator tells the readers how far they may advance their local watermarks. This can help to not prematurely advance the watermark based on a split's records when other splits have data overlapping with older ranges. An example where this is commonly the case is the streaming file source. *(2) Is the per-partition watermarking purely a feature of the library (ConnectorBase), or do we integrate it with the core (SourceOperator).* I believe we need to solve this on the same level as the previous question: - Once a connector instantiates the per-partition watermark generators, the main output (through which the SourceReader emits the records) must not run its watermark generator any more. Otherwise we extract watermarks also on the merged stream, which messes things up. So having the per-partition watermark generators simply in the ConnectorBase and emit transparently through an unchanged main output would not work. - So, if we decide to implement watermarks support in the core (SourceOperator), we would need to offer the per-partition watermarking utilities on that level as well. - Along a similar line of thoughts as in the previous point, the batch execution can optimize the watermark extraction by supplying no-op extractors also for the per-partition extractors (which will most likely bear the bulk of the load in the connectors). *(3) How would an integration of WatermarkGenerators with the SourceOperator look like?* Rather straightforward, the SourceOperator instantiates a SourceOutput that internally runs the timestamp extractor and watermark generator and emits to the DataOutput that the operator emits to. *(4) How would an integration of the per-split WatermarkGenerators look like?* I would propose to add a method to the {{SourceReaderContext}}: {{SplitAwareOutputs createSourceAwareOutputs()}} The {{SplitAwareOutputs}} looks the following way: {code:java} public interface SplitAwareOutputs { SourceOutput createOutputForSplit(String splitId); void releaseOutputForSplit(String splitId); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17898) Remove Exceptions from signatures of SourceOutput methods
Stephan Ewen created FLINK-17898: Summary: Remove Exceptions from signatures of SourceOutput methods Key: FLINK-17898 URL: https://issues.apache.org/jira/browse/FLINK-17898 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Stephan Ewen Fix For: 1.11.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17897) Resolve stability annotations discussion for FLIP-27 in 1.11
Stephan Ewen created FLINK-17897: Summary: Resolve stability annotations discussion for FLIP-27 in 1.11 Key: FLINK-17897 URL: https://issues.apache.org/jira/browse/FLINK-17897 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Stephan Ewen Assignee: Jiangjie Qin Fix For: 1.11.0 Currently the interfaces from the FLIP-27 sources are all labeled as {{@Public}}. Given that FLIP-27 is going to be in a "beta" version in the 1.11 release, we are discussing to downgrade the stability to {{@PublicEvolving}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17854) Use InputStatus directly in user-facing async input APIs (like source readers)
Stephan Ewen created FLINK-17854: Summary: Use InputStatus directly in user-facing async input APIs (like source readers) Key: FLINK-17854 URL: https://issues.apache.org/jira/browse/FLINK-17854 Project: Flink Issue Type: Improvement Components: API / Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The flink-runtime uses the {{InputStatus}} enum in its {{PushingAsyncDataInput}}. The flink-core {{SourceReader}} has a separate enum with the same purpose. In the {{SourceOperator}} we need to bridge between these two, which is clumsy and a bit inefficient. We can simply make {{InputStatus}} part of {{flink-core}} I/O packages and use it in the {{SourceReader}}, to avoid having to bridge it and the runtime part. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17850) PostgresCatalogITCase . testGroupByInsert() fails on CI
Stephan Ewen created FLINK-17850: Summary: PostgresCatalogITCase . testGroupByInsert() fails on CI Key: FLINK-17850 URL: https://issues.apache.org/jira/browse/FLINK-17850 Project: Flink Issue Type: Bug Components: Table SQL / Ecosystem Affects Versions: 1.11.0 Reporter: Stephan Ewen Fix For: 1.11.0 {{org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase . testGroupByInsert}} Error: {code} 2020-05-20T16:36:33.9647037Z org.apache.flink.table.api.ValidationException: 2020-05-20T16:36:33.9647354Z Field types of query result and registered TableSink mypg.postgres.primitive_table2 do not match. 2020-05-20T16:36:33.9648233Z Query schema: [int: INT NOT NULL, EXPR$1: VARBINARY(2147483647) NOT NULL, short: SMALLINT NOT NULL, EXPR$3: BIGINT, EXPR$4: FLOAT, EXPR$5: DOUBLE, EXPR$6: DECIMAL(10, 5), EXPR$7: BOOLEAN, EXPR$8: VARCHAR(2147483647), EXPR$9: CHAR(1) NOT NULL, EXPR$10: CHAR(1) NOT NULL, EXPR$11: VARCHAR(20), EXPR$12: TIMESTAMP(5), EXPR$13: DATE, EXPR$14: TIME(0), EXPR$15: DECIMAL(38, 18)] 2020-05-20T16:36:33.9650272Z Sink schema: [int: INT, bytea: VARBINARY(2147483647), short: SMALLINT, long: BIGINT, real: FLOAT, double_precision: DOUBLE, numeric: DECIMAL(10, 5), decimal: DECIMAL(10, 1), boolean: BOOLEAN, text: VARCHAR(2147483647), char: CHAR(1), character: CHAR(3), character_varying: VARCHAR(20), timestamp: TIMESTAMP(5), date: DATE, time: TIME(0), default_numeric: DECIMAL(38, 18)] 2020-05-20T16:36:33.9651218Zat org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100) 2020-05-20T16:36:33.9651689Zat org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:218) 2020-05-20T16:36:33.9652136Zat org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:193) 2020-05-20T16:36:33.9652936Zat scala.Option.map(Option.scala:146) 2020-05-20T16:36:33.9653593Zat org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:193) 2020-05-20T16:36:33.9653993Zat org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:152) 2020-05-20T16:36:33.9654428Zat org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:152) 2020-05-20T16:36:33.9654841Zat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 2020-05-20T16:36:33.9655221Zat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 2020-05-20T16:36:33.9655759Zat scala.collection.Iterator$class.foreach(Iterator.scala:891) 2020-05-20T16:36:33.9656072Zat scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 2020-05-20T16:36:33.9656413Zat scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 2020-05-20T16:36:33.9656890Zat scala.collection.AbstractIterable.foreach(Iterable.scala:54) 2020-05-20T16:36:33.9657211Zat scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 2020-05-20T16:36:33.9657525Zat scala.collection.AbstractTraversable.map(Traversable.scala:104) 2020-05-20T16:36:33.9657878Zat org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:152) 2020-05-20T16:36:33.9658350Zat org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1217) 2020-05-20T16:36:33.9658784Zat org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:663) 2020-05-20T16:36:33.9659391Zat org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:750) 2020-05-20T16:36:33.9659856Zat org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:653) 2020-05-20T16:36:33.9660507Zat org.apache.flink.table.planner.runtime.utils.TableEnvUtil$.execInsertSqlAndWaitResult(TableEnvUtil.scala:27) 2020-05-20T16:36:33.9661115Zat org.apache.flink.table.planner.runtime.utils.TableEnvUtil.execInsertSqlAndWaitResult(TableEnvUtil.scala) 2020-05-20T16:36:33.9661583Zat org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase.testGroupByInsert(PostgresCatalogITCase.java:88) {code} Full log: https://dev.azure.com/sewen0794/19b23adf-d190-4fb4-ae6e-2e92b08923a3/_apis/build/builds/25/logs/93 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17849) YARNHighAvailabilityITCase hangs in Azure Pipelines CI
Stephan Ewen created FLINK-17849: Summary: YARNHighAvailabilityITCase hangs in Azure Pipelines CI Key: FLINK-17849 URL: https://issues.apache.org/jira/browse/FLINK-17849 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.11.0 Reporter: Stephan Ewen Fix For: 1.11.0 The test seems to hang for 15 minutes, then gets killed. Full logs: https://dev.azure.com/sewen0794/19b23adf-d190-4fb4-ae6e-2e92b08923a3/_apis/build/builds/25/logs/121 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17816) Change Latency Marker to work with "scheduleAtFixedDelay" instead of "scheduleAtFixedRate"
Stephan Ewen created FLINK-17816: Summary: Change Latency Marker to work with "scheduleAtFixedDelay" instead of "scheduleAtFixedRate" Key: FLINK-17816 URL: https://issues.apache.org/jira/browse/FLINK-17816 Project: Flink Issue Type: Bug Components: Runtime / Task Reporter: Stephan Ewen Latency Markers and other periodic timers are scheduled with {{scheduleAtFixedRate}}. That means every X time the callable is called. If it blocks (backpressure) is can be called immediately again. I would suggest to switch this to {{scheduleAtFixedDelay}} to avoid calling for a lot of latency marker injections when there is no way to actually execute the injection call. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17781) OperatorCoordinator Context must support calls from thread other than JobMaster Main Thread
Stephan Ewen created FLINK-17781: Summary: OperatorCoordinator Context must support calls from thread other than JobMaster Main Thread Key: FLINK-17781 URL: https://issues.apache.org/jira/browse/FLINK-17781 Project: Flink Issue Type: Sub-task Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 Currently, calls on the Context in the OperatorCoordinator go directly synchronously to the ExcutionGraph. There are two critical problems are: - It is common that the code in the OperatorCoordinator runs in a separate thread (for example, because it executes blocking operations). Calling the scheduler from another thread causes the Scheduler to crash (Assertion Error, violation of single threaded property) - Calls on the ExecutionGraph are removed as part of removing the legacy scheduler. Certain calls do not work any more. +Problem Level 1:+ The solution would be to pass in the scheduler and a main thread executor to interact with it. However, to do that the scheduler needs to be created before the OperatorCoordinators are created. One could do that by creating the Coordinators lazily after the Scheduler. +Problem Level 2:+ The Scheduler restores the savepoints as part of the scheduler creation, when the ExecutionGraph and the CheckpointCoordinator are created early in the constructor. (Side note: That design is tricky in itself, because it means state is restored before the scheduler is even properly constructed.) That means the OperatorCoordinator needs to exist (or an in placeholder component needs to exist) to accept the restored state. That brings us to a cyclic dependency: - OperatorCoordinator (context) needs Scheduler and MainThreadExecutor - Scheduler and MainThreadExecutor need constructed ExecutionGraph - ExecutionGraph needs CheckpointCoordinator - CheckpointCoordinator needs OperatorCoordinator +Breaking the Cycle+ The only way we can do this is with a form of lazy initialization: - We eagerly create the OperatorCoordinators so they exist for state restore - We provide an uninitialized context to them - When the Scheduler is started (after leadership is granted) we initialize the context with the (then readily constructed) Scheduler and MainThreadExecutor +Longer-term Solution+ The longer term solution would require a major change in the Scheduler and CheckpointCoordinator setup. Something like this: - Scheduler (and ExecutionGraph) are constructed first - JobMaster waits for leadership - Upon leader grant, Operator Coordinators are constructed and can reference the Scheduler and FencedMainThreadExecutor - CheckpointCoordinator is constructed and references ExecutionGraph and OperatorCoordinators - Savepoint or latest checkpoint is restored The implementation of the current should try to couple parts as loosely as possible to make it easy to implement the above approach later. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17702) OperatorCoordinators must be notified of tasks cancelled as part of failover
Stephan Ewen created FLINK-17702: Summary: OperatorCoordinators must be notified of tasks cancelled as part of failover Key: FLINK-17702 URL: https://issues.apache.org/jira/browse/FLINK-17702 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The OperatorCoordinators are currently only notified of tasks that directly fail. However, tasks that are cancelled (as part of the regional failover) must be handled the same was and also send notifications to the OperatorCoordinator. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17701) Exclude jdk:tools dependency from all Hadoop dependencies for Java 9+ compatibility
Stephan Ewen created FLINK-17701: Summary: Exclude jdk:tools dependency from all Hadoop dependencies for Java 9+ compatibility Key: FLINK-17701 URL: https://issues.apache.org/jira/browse/FLINK-17701 Project: Flink Issue Type: Bug Components: Build System Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 Hadoop transitively pulls the system dependency {{jdk:tools}} which is not longer available on Java 9+. This causes errors when importing the code into an IDE with runs Java 11. This dependency is anyways not needed when running the code, because the classes are always present. It can be safely excluded form the transitive dependencies. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17699) Reduce scope for SourceOperator arguments and initialize more eagerly
Stephan Ewen created FLINK-17699: Summary: Reduce scope for SourceOperator arguments and initialize more eagerly Key: FLINK-17699 URL: https://issues.apache.org/jira/browse/FLINK-17699 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 Currently, the {{SourceOperator}} only gets a {{Source}} in the constructor. All actual components that the {{SourceOperator}} relies on when working are lazily initialized, in {{open()}} or via setters. Relying on something as broad as {{Source}} also means that a lot of redundant context has to be provided to the {{SourceOperator}} during initialization. The {{Source}} is, for example, also responsible for the {{SourceEnumerator}}, which is independent of the {{SourceOperator}}. However, it needs to be considered during testing, now, because the tests need to mock a full {{Source}} in order to instantiate a {{SourceOperator}}. The solution is to directly pass the collaborators of the {{SourceOperator}} directly eagerly into the constructor. It is not fully possible with the {{SourceReader}}, but for that we can at least reduce the scope by passing a targeted factory function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17696) Support eager initialization of operators with OperatorEventDispatcher
Stephan Ewen created FLINK-17696: Summary: Support eager initialization of operators with OperatorEventDispatcher Key: FLINK-17696 URL: https://issues.apache.org/jira/browse/FLINK-17696 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The {{StreamOperatorParameters}} are parameters available for eager initialization of the {{StreamOperators}}. We should add the {{OperatorEventDispatcher}} to make it available for eager initialization. At the same time, we should split steps "obtaining the {{OperatorEventGateway}}" and "registering the {{OperatorEventHandler}}" to support getting an eager reference to the {{OperatorEventGateway}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17695) Simplify SourceOperator by using a utility SimpleVersionedListState
Stephan Ewen created FLINK-17695: Summary: Simplify SourceOperator by using a utility SimpleVersionedListState Key: FLINK-17695 URL: https://issues.apache.org/jira/browse/FLINK-17695 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The SourceOperator has some boiler plate code taking the bytes out of the {{ListState}} and applying the {{SimpleVersionedSerializer}} to turn them into the splits. This repeated code can be simply encapsulated in a utility class {{SimpleVersionedListState}} which wraps a {{ListState}} and applies the serialization and de-serialization. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17694) Wrong min-length check in SimpleVersionedSerialization
Stephan Ewen created FLINK-17694: Summary: Wrong min-length check in SimpleVersionedSerialization Key: FLINK-17694 URL: https://issues.apache.org/jira/browse/FLINK-17694 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.10.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 The SimpleVersionedSerialization checks for a minimum of 4 bytes when checking the arguments, but needs at least 8 bytes (two integers). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17674) OperatorCoordinator state in checkpoints should always be a ByteStreamStateHandle
Stephan Ewen created FLINK-17674: Summary: OperatorCoordinator state in checkpoints should always be a ByteStreamStateHandle Key: FLINK-17674 URL: https://issues.apache.org/jira/browse/FLINK-17674 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.11.0 State restore to the task vertices and coordinators (even after loading the Checkpoint Metadata) happens in the JobManager's main thread and must consequently not do any potentially blocking I/O operations. The OperatorCoordinator state is a generic {{StreamStateHandle}} whose state might require I/O to retrieve. This never happens in the current implementation (we always use {{ByteStreamStateHandle}}) the signatures and contracts don't guarantee that and leave this open for a potential future bug. Typing the OperatorCoordinator state to ByteStreamStateHandle makes sure that we can always retrieve the data directly without I/O and clarifies that no arbitrary StreamStateHandle is supported at that point. If state restoring becomes an asynchronous operation we can relax this restriction. -- This message was sent by Atlassian Jira (v8.3.4#803005)