[jira] [Created] (FLINK-30072) Cannot assign instance of SerializedLambda to field KeyGroupStreamPartitioner.keySelector
Nico Kruber created FLINK-30072: --- Summary: Cannot assign instance of SerializedLambda to field KeyGroupStreamPartitioner.keySelector Key: FLINK-30072 URL: https://issues.apache.org/jira/browse/FLINK-30072 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.16.0 Reporter: Nico Kruber In application mode, if the {{usrlib}} directories of the JM and TM differ, e.g. same jars but different names, the job is failing and throws this cryptic exception on the JM: {code} 2022-11-17 09:55:12,968 INFO org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting job. org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order. at org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:537) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1600) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1584) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:408) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:362) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:335) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:327) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:317) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.(SourceOperatorStreamTask.java:84) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at jdk.internal.reflect.GeneratedConstructorAccessor38.newInstance(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?] at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?] at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1589) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:714) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0] at java.lang.Thread.run(Unknown Source) ~[?:?] Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source) ~[?:?] at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source) ~[?:?] at java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.util.ArrayList.readObject(Unknown Source) ~[?:?] at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0
[jira] [Created] (FLINK-30045) FromClasspathEntryClassInformationProvider too eager to verify MainClass
Nico Kruber created FLINK-30045: --- Summary: FromClasspathEntryClassInformationProvider too eager to verify MainClass Key: FLINK-30045 URL: https://issues.apache.org/jira/browse/FLINK-30045 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.16.0, 1.17.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.17.0, 1.16.1 {{FromClasspathEntryClassInformationProvider}} is attempting to verify (eagerly) whether the given MainClass is on the user classpath. However, it doesn't handle cases where the main class is inside a nested jar. This is something you would see when using such a nested jar file with the {{StandaloneApplicationClusterEntryPoint}}, e.g. from {{standalone-job.sh}} We actually don't need this check at all since {{PackagedProgram}} is already doing it while attempting to load the main class. Having this once should be enough. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29884) Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease
Nico Kruber created FLINK-29884: --- Summary: Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease Key: FLINK-29884 URL: https://issues.apache.org/jira/browse/FLINK-29884 Project: Flink Issue Type: Bug Components: Runtime / Coordination, Runtime / Network, Tests Affects Versions: 1.17.0 Reporter: Nico Kruber Fix For: 1.17.0 {{SortMergeResultPartitionTest.testRelease}} failed with a timeout in the finegrained_resource_management tests: {code:java} Nov 03 17:28:07 [ERROR] Tests run: 20, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 64.649 s <<< FAILURE! - in org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest Nov 03 17:28:07 [ERROR] SortMergeResultPartitionTest.testRelease Time elapsed: 60.009 s <<< ERROR! Nov 03 17:28:07 org.junit.runners.model.TestTimedOutException: test timed out after 60 seconds Nov 03 17:28:07 at org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374) Nov 03 17:28:07 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Nov 03 17:28:07 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Nov 03 17:28:07 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Nov 03 17:28:07 at java.lang.reflect.Method.invoke(Method.java:498) Nov 03 17:28:07 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Nov 03 17:28:07 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Nov 03 17:28:07 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Nov 03 17:28:07 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Nov 03 17:28:07 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) Nov 03 17:28:07 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) Nov 03 17:28:07 at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) Nov 03 17:28:07 at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) Nov 03 17:28:07 at java.util.concurrent.FutureTask.run(FutureTask.java:266) Nov 03 17:28:07 at java.lang.Thread.run(Thread.java:748) {code} [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42806=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29868) Dependency convergence error for org.osgi:org.osgi.core:jar
Nico Kruber created FLINK-29868: --- Summary: Dependency convergence error for org.osgi:org.osgi.core:jar Key: FLINK-29868 URL: https://issues.apache.org/jira/browse/FLINK-29868 Project: Flink Issue Type: Bug Components: Build System, Table SQL / Runtime Affects Versions: 1.17.0 Reporter: Nico Kruber Fix For: 1.17.0 While working on FLINK-29867, the following new error is popping up while running {code} ./mvnw clean install -pl flink-dist -am -DskipTests -Dflink.convergence.phase=install -Pcheck-convergence {code} (this is also done by CI which therefore fails) {code} [WARNING] Dependency convergence error for org.osgi:org.osgi.core:jar:4.3.0:runtime paths to dependency are: +-org.apache.flink:flink-table-planner-loader-bundle:jar:1.17-SNAPSHOT +-org.apache.flink:flink-table-planner_2.12:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-table-api-java-bridge:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-streaming-java:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-runtime:jar:1.17-SNAPSHOT:runtime +-org.xerial.snappy:snappy-java:jar:1.1.8.3:runtime +-org.osgi:org.osgi.core:jar:4.3.0:runtime and +-org.apache.flink:flink-table-planner-loader-bundle:jar:1.17-SNAPSHOT +-org.apache.flink:flink-table-planner_2.12:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-scala_2.12:jar:1.17-SNAPSHOT:runtime +-org.apache.flink:flink-core:jar:1.17-SNAPSHOT:runtime +-org.apache.commons:commons-compress:jar:1.21:runtime +-org.osgi:org.osgi.core:jar:6.0.0:runtime {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29867) Update maven-enforcer-plugin to 3.1.0
Nico Kruber created FLINK-29867: --- Summary: Update maven-enforcer-plugin to 3.1.0 Key: FLINK-29867 URL: https://issues.apache.org/jira/browse/FLINK-29867 Project: Flink Issue Type: Improvement Components: Build System Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.17.0 We currently rely on 3.0.0-M1 but will have to skip 3.0.0 (final) due to MENFORCER-394 which hits Flink's current code base as well -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29865) Allow configuring the JDK in build-nightly-dist.yml
Nico Kruber created FLINK-29865: --- Summary: Allow configuring the JDK in build-nightly-dist.yml Key: FLINK-29865 URL: https://issues.apache.org/jira/browse/FLINK-29865 Project: Flink Issue Type: Improvement Components: Build System / Azure Pipelines Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.17.0 {{build-nightly-dist.yml}} currently uses the default JDK from https://github.com/flink-ci/flink-ci-docker which happens to be Java 1.8 that we use for releases. We should # not rely on this default being set to 1.8 and # be able to configure this in the workflows themselves -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29643) Possible NPE in ApplicationDispatcherBootstrap with failedJob submission and no HA
Nico Kruber created FLINK-29643: --- Summary: Possible NPE in ApplicationDispatcherBootstrap with failedJob submission and no HA Key: FLINK-29643 URL: https://issues.apache.org/jira/browse/FLINK-29643 Project: Flink Issue Type: Bug Components: Client / Job Submission Affects Versions: 1.15.2, 1.16.0, 1.17.0 Reporter: Nico Kruber If - {{PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID}} is not set, and - high availabibility is not activated, and - {{DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR}} is set, then a failure in job submission may fail with an NPE since the appropriate code in {{ApplicationDispatcherBootstrap#runApplicationEntryPoint()}} is trying to read the {{failedJobId}} from the configuration where it will not be present in these cases. Please refer to the conditions that set the {{jobId}} in {{ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync()}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29208) Logs and stdout endpoints not mentioned on OpenAPI spec
Nico Kruber created FLINK-29208: --- Summary: Logs and stdout endpoints not mentioned on OpenAPI spec Key: FLINK-29208 URL: https://issues.apache.org/jira/browse/FLINK-29208 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.15.2 Reporter: Nico Kruber Using Flink's web UI and clicking on "Stdout" or "Logs" in a JM or TM accesses endpoints {{/jobmanager/logs}} and {{/jobmanager/stdout}} (and similar for TMs) but these don't seem to exist in the [REST API docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/] or the [REST API OpenAPI spec|https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v1_dispatcher.yml]. Either these should become some webui-internal APIs (for which no concept exists at the moment), or these endpoints should be added to the docs and spec. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-27836) RocksDBMapState iteration may stop too early for var-length prefixes
Nico Kruber created FLINK-27836: --- Summary: RocksDBMapState iteration may stop too early for var-length prefixes Key: FLINK-27836 URL: https://issues.apache.org/jira/browse/FLINK-27836 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.14.4, 1.13.6, 1.15.0 Reporter: Nico Kruber A similar, yet orthogonal, issue to https://issues.apache.org/jira/browse/FLINK-11141 is that the iterators used in RocksDBMapState iterate over everything with a matching prefix of flink-key and namespace. With var-length serializers for either of them, however, it may return data for unrelated keys and/or namespaces. It looks like the built-in serializers of Flink are not affected though since they use a var-length encoding that is prefixed with the object's length and thus different lengths will not have the same prefix. More exotic serializers, e.g. relying on a terminating NUL character, may expose the above-mentioned behaviour, though. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-220: Temporal State
While working a bit more on this, David A and I noticed a couple of things that were not matching each other in the proposed APIs: a) the proposed BinarySortedMultiMapState class didn't actually have getters that return multiple items per key, and b) while having a single multi-map like implementation in the backend, with the adapted API, we'd like to put it up for discussion again whether we maybe want to have a user-facing BinarySortedMapState API as well which can be simpler but doesn't add any additional constraints to the state backends. Let me go into details a bit more: in a multi-map, a single key can be backed by a set of items and as such, the atomic unit that should be retrievable is not a single item but rather something like a Collection, an Iterable , a List, or so. Since we are already using Iterable in the main API, how about the following? ``` public class BinarySortedMultiMapState extends State { void put(UK key, Iterable values) throws Exception; void add(UK key, UV value) throws Exception; Iterable valueAt(UK key) throws Exception; Map.Entry> firstEntry() throws Exception; Map.Entry> lastEntry() throws Exception; Iterable>> readRange(UK fromKey, UK toKey) throws Exception; Iterable>> readRangeUntil(UK endUserKey) throws Exception; Iterable>> readRangeFrom(UK startUserKey) throws Exception; void clearRange(UK fromKey, UK toKey) throws Exception; void clearRangeUntil(UK endUserKey) throws Exception; void clearRangeFrom(UK startUserKey) throws Exception; } ``` We also considered using Iterable> instead of Map.Entry>, but that wouldn't match well with firstEntry() and lastEntry() because for a single key, there is not a single first/last value. We also looked at common MultiMap insterfaces and their getters were also always retrieving the whole list/collection for a key. Since we don't want to promise too many details to the user, we believe, Iterable is our best choice for now but that can also be "upgraded" to, e.g., List in the future without breaking client code. An appropriate map-like version of that would be the following: ``` public class BinarySortedMapState extends State { void put(UK key, UV value) throws Exception; UV valueAt(UK key) throws Exception; Map.Entry firstEntry() throws Exception; Map.Entry lastEntry() throws Exception; Iterable> readRange(UK fromKey, UK toKey) throws Exception; Iterable> readRangeUntil(UK endUserKey) throws Exception; Iterable> readRangeFrom(UK startUserKey) throws Exception; void clearRange(UK fromKey, UK toKey) throws Exception; void clearRangeUntil(UK endUserKey) throws Exception; void clearRangeFrom(UK startUserKey) throws Exception; } ``` We believe, we were also missing details regarding the state descriptor and I'm still a bit fuzzy on what to provide as type T in StateDescriptor. For the constructors, however, since we'd require a LexicographicalTypeSerializer implementation, we would propose the following three overloads similar to the MapStateDescriptor: ``` public class BinarySortedMultiMapStateDescriptor extends StateDescriptor, Map>/*?*/> { public BinarySortedMapStateDescriptor( String name, LexicographicalTypeSerializer keySerializer, TypeSerializer valueSerializer) {} public BinarySortedMapStateDescriptor( String name, LexicographicalTypeSerializer keySerializer, TypeInformation valueTypeInfo) {} public BinarySortedMapStateDescriptor( String name, LexicographicalTypeSerializer keySerializer, Class valueClass) {} } ``` Technically, we could have a LexicographicalTypeInformation as well (for the 2nd overload) but don't currently see the need for that wrapper since these serializers are just needed for State - but maybe someone with more insights into this topic can advise. A few further points to to with respect to the implementation: - we'll have to find a suitable heap-based state backend implementation that integrates well with all current efforts (needs to be discussed) - the StateProcessor API will have to receive appropriate extensions to read and write this new form of state Nico On Friday, 29 April 2022 14:25:59 CEST Nico Kruber wrote: > Hi all, > Yun, David M, David A, and I had an offline discussion and talked through a > couple of details that emerged from the discussion here. We believe, we have > found a consensus on these points and would like to share our points for > further feedback: > > Let me try to get through the points that were opened in arbitrary order: > > > 1. We want to offer a generic interface for sorted state, not just temporal > state as proposed initially. We would like to... > a) ...offer a single new state type similar to what TemporalListState was > offering (so not offering something like TemporalValueState to keep the API > slim). > b) ...name it Binar
Re: [DISCUSS] FLIP-220: Temporal State
9) Why don't we want to provide a BinarySortedMap with value-like semantics similar to TemporalValueState? -> We'd like to keep the code overhead in Flink small and not provide two more state primitives but instead only a single one. For use cases where you don't want to handle lists, you can use the BinarySortedMultiMap with its put() method and a list with a single entry that would overwrite the old one. While retrieving the value(s), you can then assume the list is either empty or has a single entry similar to what you are currently doing in a WindowProcessFunction. You can also always add a thin wrapper to provide that under a more convenient API if you need to. A10) effects on the CEPOperator? -> We don't have an overview yet. The buffering of events inside its `MapState> elementQueueState`, however, is a pattern that would benefit from our MultiMap since a single add() operation wouldn't require you to read the whole list again. Sorry for the long email - we'd be happy to get more feedback and will incorporate this into the FLIP description soon. Nico -- Dr. Nico Kruber | Solutions Architect Follow us @VervericaData Ververica -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Re: [DISCUSS] FLIP-220: Temporal State
ls down to the discussion of how to handle duplicates. > From the commonly accepted contracts, list implies that there could be > duplicates and map implies otherwise. One concern about `Map` is that it > also implies that you should be able to do a point query. > > Best, > D. > > . > > On Fri, Apr 22, 2022 at 9:21 AM Yun Tang wrote: > > Hi Nico, > > > > I did not mean that we need to support all APIs in NavigableMap, and it is > > indeed too heavily to implement them all. > > Moreover, I also prefer iterator-like API instead of the original #tailMap > > like API. I just use NavigableMap's API to show examples. > > > > I think we can introduce new APIs like: > > > > SortedMapState extends State > > > > Map.Entry firstEntry() throws Exception; > > Map.Entry lastEntry() throws Exception; > > Iterator> headIterator(UK endUserKey) throws > > Exception; > > Iterator> tailIterator(UK startUserKey) throws > > > > Exception; > > > > Iterator> subIterator(UK startUserKey, UK endUserKey) > > > > throws Exception; > > > > Since SortedMapState has several new APIs, I prefer to introduce new state > > descriptor to distinguish with the original map state. > > > > For the API of SortedMapOfListsState, I would not be strong against, and I > > just want to know the actual benefits if we really want to introduce API. > > > > When talking about the part of changelog state backend, my concern is > > about how to group keys together in the changelog logger. > > I can share a problem, and before this I need to spend some time on how to > > implement serializer to keep the order of serialized bytes same as > > original > > java objects first. > > For the fixed-length serializer, such as LongSerializer, we just need to > > ensure all numbers are positive or inverting the sign bit. > > However, for non-fixed-length serializer, such as StringSerializer, it > > will write the length of the bytes first, which will break the natural > > order if comparing the bytes. Thus, we might need to avoid to write the > > length in the serialized bytes. > > On the other hand, changelog logger would record operation per key one by > > one in the logs. We need to consider how to distinguish each key in the > > combined serialized byte arrays. > > > > Best > > Yun Tang > > > > -- > > *From:* Nico Kruber > > *Sent:* Thursday, April 21, 2022 23:50 > > *To:* dev > > *Cc:* David Morávek ; Yun Tang > > *Subject:* Re: [DISCUSS] FLIP-220: Temporal State > > > > Thanks Yun Tang for your clarifications. > > Let me keep my original structure and reply in these points... > > > > 3. Should we generalise the Temporal***State to offer arbitrary key types > > and > > not just Long timestamps? > > > > The use cases you detailed do indeed look similar to the ones we were > > optimising in our TemporalState PoC... > > > > I don't think, I'd like to offer a full implementation of NavigableMap > > though > > because that seems quite some overhead to implement while we can cover the > > mentioned examples with the proposed APIs already when using iterators as > > well > > as single-value retrievals. > > So far, when we were iterating from the smallest key, we could just use > > Long.MIN_VALUE and start from there. That would be difficult to generalise > > for > > arbitrary data types because you may not always know the smallest possible > > value for a certain serialized type (unless we put this into the > > appropriate > > serializer interface). > > > > I see two options here: > > a) a slim API but using NULL as an indicator for smallest/largest > > depending on > > the context, e.g. > > > > - `readRange(null, key)` means from beginning to key > > - `readRange(key, null)` means from key to end > > - `readRange(null, null)` means from beginning to end > > - `value[AtOr]Before(null)` means largest available key > > - `value[AtOr]After(null)` means smallest available key > > > > b) a larger API with special methods for each of these use cases similar > > to > > what NavigableMap has but based on iterators and single-value functions > > only > > > > > BTW, I prefer to introduce another state descriptor instead of current > > > > map > > > > > state descriptor. > > > > Can you elaborate on this? We currently don't need extra functionality, so > > this would
[jira] [Created] (FLINK-27353) Update training exercises to use Flink 1.15
Nico Kruber created FLINK-27353: --- Summary: Update training exercises to use Flink 1.15 Key: FLINK-27353 URL: https://issues.apache.org/jira/browse/FLINK-27353 Project: Flink Issue Type: New Feature Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.15.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-220: Temporal State
Thanks Yun Tang for your clarifications. Let me keep my original structure and reply in these points... 3. Should we generalise the Temporal***State to offer arbitrary key types and not just Long timestamps? The use cases you detailed do indeed look similar to the ones we were optimising in our TemporalState PoC... I don't think, I'd like to offer a full implementation of NavigableMap though because that seems quite some overhead to implement while we can cover the mentioned examples with the proposed APIs already when using iterators as well as single-value retrievals. So far, when we were iterating from the smallest key, we could just use Long.MIN_VALUE and start from there. That would be difficult to generalise for arbitrary data types because you may not always know the smallest possible value for a certain serialized type (unless we put this into the appropriate serializer interface). I see two options here: a) a slim API but using NULL as an indicator for smallest/largest depending on the context, e.g. - `readRange(null, key)` means from beginning to key - `readRange(key, null)` means from key to end - `readRange(null, null)` means from beginning to end - `value[AtOr]Before(null)` means largest available key - `value[AtOr]After(null)` means smallest available key b) a larger API with special methods for each of these use cases similar to what NavigableMap has but based on iterators and single-value functions only > BTW, I prefer to introduce another state descriptor instead of current map > state descriptor. Can you elaborate on this? We currently don't need extra functionality, so this would be a plain copy of the MapStateDescriptor... > For the API of SortedMapOfListsState, I think this is a bit bounded to > current implementation of RocksDB state-backend. Actually, I don't think this is special to RocksDB but generic to all state backends that do not hold values in memory and allow fast append-like operations. Additionally, since this is a very common use case and RocksDB is also widely used, I wouldn't want to continue without this specialization. For a similar reason, we offer ListState and not just ValueState... 4. ChangelogStateBackend > For the discussion of ChangelogStateBackend, you can think of changelog > state-backend as a write-ahead-log service. And we need to record the > changes to any state, thus this should be included in the design doc as we > need to introduce another kind of state, especially you might need to > consider how to store key bytes serialized by the new serializer (as we > might not be able to write the length in the beginning of serialized bytes > to make the order of bytes same as natural order). Since the ChangelogStateBackend "holds the working state in the underlying delegatedStateBackend, and forwards state changes to State Changelog", I honestly still don't see how this needs special handling. As long as the delegated state backend suppors sorted state, ChangelogStateBackend doesn't have to do anything special except for recording changes to state. Our PoC simply uses the namespace for these keys and that's the same thing the Window API is already using - so there's nothing special here. The order in the log doesn't have to follow the natural order because this is only required inside the delegatedStateBackend, isn't it? Nico On Wednesday, 20 April 2022 17:03:11 CEST Yun Tang wrote: > Hi Nico, > > Thanks for your clarification. > For the discussion about generalizing Temporal state to sorted map state. I > could give some examples of how to use sorted map state in min/max with > retract functions. > As you know, NavigableMap in java has several APIs like: > > Map.Entry firstEntry(); > Map.Entry lastEntry(); > NavigableMap tailMap(K fromKey, boolean inclusive) > > The #firstEntry API could be used in MinWithRetractAggFunction#updateMin, > #lastEntry could be used in MaxWithRetractAggFunction#updateMax, and > #tailMap could be used in FirstValueWithRetractAggFunction#retract. If we > can introduce SortedMap-like state, these functions could be benefited. > BTW, I prefer to introduce another state descriptor instead of current map > state descriptor. > For the API of SortedMapOfListsState, I think this is a bit bounded to > current implementation of RocksDB state-backend. > For the discussion of ChangelogStateBackend, you can think of changelog > state-backend as a write-ahead-log service. And we need to record the > changes to any state, thus this should be included in the design doc as we > need to introduce another kind of state, especially you might need to > consider how to store key bytes serialized by the new serializer (as we > might not be able to write the length in the beginning of serialized bytes > to make the order of bytes same as natural order). >
[jira] [Created] (FLINK-27322) Add license headers and spotless checks for them
Nico Kruber created FLINK-27322: --- Summary: Add license headers and spotless checks for them Key: FLINK-27322 URL: https://issues.apache.org/jira/browse/FLINK-27322 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.14.4 Reporter: Nico Kruber Assignee: Nico Kruber It looks as if there are a couple of files that are missing their appropriate license headers, e.g. https://github.com/apache/flink-training/blob/0b1c83b16065484200564402bef2ca10ef19cb30/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/RideAndFare.java We should fix that by: # adding the missing license headers # adding spotless checks to ensure this doesn't happen again Potential downside: if a user doing the training exercises creates files on their own, these would need the license header as well. On the other hand, a simple `./gradlew spotlessApply` can fix that easily -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-220: Temporal State
> > > Does that answer your question? > > > > > > > > D. > > > > > > > > On Wed, Apr 13, 2022 at 12:21 PM David Anderson > > wrote: > > > > > > > >> Yun Tang and Jingsong, > >> > >> > >> > >> Some flavor of OrderedMapState is certainly feasible, and I do see some > >> appeal in supporting Binary**State. > >> > >> > >> > >> However, I haven't seen a motivating use case for this generalization, > >> and > >> would rather keep this as simple as possible. By handling Longs we can > >> already optimize a wide range of use cases. > >> > >> > >> > >> David > >> > >> > >> > >> > >> On Tue, Apr 12, 2022 at 9:21 AM Yun Tang wrote: > >> > >> > >> > >> > Hi David, > >> > > >> > > >> > > >> > Could you share some explanations why SortedMapState cannot work in > >> > details? I just cannot catch up what the statement below means: > >> > > >> > > >> > > >> > This was rejected as being overly difficult to implement in a way that > >> > would cleanly leverage RocksDB’s iterators. > >> > > >> > > >> > > >> > > >> > Best > >> > Yun Tang > >> > > >> > From: Aitozi > >> > Sent: Tuesday, April 12, 2022 15:00 > >> > To: dev@flink.apache.org > >> > Subject: Re: [DISCUSS] FLIP-220: Temporal State > >> > > >> > > >> > > >> > Hi David > >> > > >> > I have look through the doc, I think it will be a good > >> > improvement > >> > >> to > >> > >> > this pattern usage, I'm interested in it. Do you have some POC work to > >> > share for a closer look. > >> > Besides, I have one question that can we support expose the namespace > >> > in > >> > the different state type not limited to `TemporalState`. By this, user > >> > >> can > >> > >> > specify the namespace > >> > and the TemporalState is one of the special case that it use timestamp > >> > >> as > >> > >> > the namespace. I think it will be more extendable. > >> > > >> > What do you think about this ? > >> > > >> > > >> > > >> > Best, > >> > Aitozi. > >> > > >> > > >> > > >> > David Anderson 于2022年4月11日周一 20:54写道: > >> > > >> > > >> > > >> > > Greetings, Flink developers. > >> > > > >> > > > >> > > > >> > > I would like to open up a discussion of a proposal [1] to add a new > >> > >> kind > >> > >> > of > >> > > >> > > state to Flink. > >> > > > >> > > > >> > > > >> > > The goal here is to optimize a fairly common pattern, which is using > >> > > > >> > > > >> > > > >> > > MapState> > >> > > > >> > > > >> > > > >> > > to store lists of events associated with timestamps. This pattern is > >> > >> used > >> > >> > > internally in quite a few operators that implement sorting and > >> > > joins, > >> > >> and > >> > >> > > it also shows up in user code, for example, when implementing custom > >> > > windowing in a KeyedProcessFunction. > >> > > > >> > > > >> > > > >> > > Nico Kruber, Seth Wiesman, and I have implemented a POC that > >> > > achieves > >> > >> a > >> > >> > > more than 2x improvement in throughput when performing these > >> > >> operations > >> > >> > on > >> > > >> > > RocksDB by better leveraging the capabilities of the RocksDB state > >> > > >> > backend. > >> > > >> > > > >> > > > >> > > See FLIP-220 [1] for details. > >> > > > >> > > > >> > > > >> > > Best, > >> > > David > >> > > > >> > > > >> > > > >> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD > >> > > > >> > > > >> > > >> > > >> > >> > > > > Dr. Nico Kruber | Solutions Architect Follow us @VervericaData Ververica -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
[jira] [Created] (FLINK-26852) RocksDBMapState#clear not forwarding exceptions
Nico Kruber created FLINK-26852: --- Summary: RocksDBMapState#clear not forwarding exceptions Key: FLINK-26852 URL: https://issues.apache.org/jira/browse/FLINK-26852 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.14.4, 1.15.0 Reporter: Nico Kruber I accidentally found an inconsistent behaviour in the RocksDB state backend implementation: If there's an exception in {{AbstractRocksDBState#clear()}} it will forward that inside a {{FlinkRuntimeException}}. However, if there's an exception in {{RocksDBMapState#clear}} it will merely print the exception stacktrace and continue as is. I assume, forwarding the exception as {{FlinkRuntimeException}} should be the desired behaviour for both use cases... -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25362) Incorrect dependencies in Table Confluent/Avro docs
Nico Kruber created FLINK-25362: --- Summary: Incorrect dependencies in Table Confluent/Avro docs Key: FLINK-25362 URL: https://issues.apache.org/jira/browse/FLINK-25362 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.14.2, 1.13.5, 1.12.7 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.3 "Confluent Avro Format" is missing an explanation to also * add the dependency to flink-avro * have the confluent repository defined "Avro Format" should not show the maven dependency to {{flink-sql-avro}} but instead {{flink-avro}} -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [DISCUSS] Deprecate Java 8 support
Tue, Nov 23, 2021 at 8:46 AM David Morávek > > > > wrote: > > > > >>>> Thank you Chesnay for starting the discussion! This will generate > > > > bit > > > > > > of > > > > > > > > >>> a > > > > >>> > > > > >>>> work for some users, but it's a good thing to keep moving the > > > > project > > > > > > >>>> forward. Big +1 for this. > > > > >>>> > > > > >>>> Jingsong: > > > > >>>> > > > > >>>> Receiving this signal, the user may be unhappy because his > > > > application > > > > > > >>>>> may be all on Java 8. Upgrading is a big job, after all, many > > > > systems > > > > > > >>>>> have not been upgraded yet. (Like you said, HBase and Hive) > > > > >>>> > > > > >>>> The whole point of deprecation is to raise awareness, that this > > > > will > > > > > > be > > > > > > > > >>>> happening eventually and users should take some steps to address > > > > this > > > > > > in > > > > > > > > >>>> medium-term. If I understand Chesnay correctly, we'd still keep > > > > Java 8 > > > > > > >>>> around for quite some time to give users enough time to upgrade, > > > > but > > > > > > >>>> without raising awareness we'd fight the very same argument later > > > > in > > > > > > >>> time. > > > > >>> > > > > >>>> All of the prerequisites from 3rd party projects for both HBase > > > > [1] > > > > > > and > > > > > > > > >>>> Hive [2] to fully support Java 11 have been completed, so the > > > > ball is > > > > > > on > > > > > > > > >>>> their side and there doesn't seem to be much activity. Generating > > > > bit > > > > > > >>> more > > > > >>> > > > > >>>> pressure on these efforts might be a good thing. > > > > >>>> > > > > >>>> It would be great to identify some of these users and learn bit > > > > more > > > > > > >>> about > > > > >>> > > > > >>>> their situation. Are they keeping up with latest Flink > > > > developments or > > > > > > >>> are > > > > >>> > > > > >>>> they lagging behind (this would also give them way more time for > > > > >>>> eventual > > > > >>>> upgrade)? > > > > >>>> > > > > >>>> [1] https://issues.apache.org/jira/browse/HBASE-22972 > > > > >>>> [2] https://issues.apache.org/jira/browse/HIVE-22415 > > > > >>>> > > > > >>>> Best, > > > > >>>> D. > > > > >>>> > > > > >>>> On Tue, Nov 23, 2021 at 3:08 AM Jingsong Li < > > > > jingsongl...@gmail.com> > > > > > > >>>> wrote: > > > > >>>>> Hi Chesnay, > > > > >>>>> > > > > >>>>> Thanks for bringing this for discussion. > > > > >>>>> > > > > >>>>> We should dig deeper into the current Java version of Flink > > > > users. At > > > > > > >>>>> least make sure Java 8 is not a mainstream version. > > > > >>>>> > > > > >>>>> Receiving this signal, the user may be unhappy because his > > > > > > > > application > > > > > > > > >>>>> may be all on Java 8. Upgrading is a big job, after all, many > > > > systems > > > > > > >>>>> have not been upgraded yet. (Like you said, HBase and Hive) > > > > >>>>> > > > > >>>>> In my opinion, it is too early to deprecate support for Java 8. > > > > We > > > > > > >>>>> should wait for a safer point in time. > > > > >>>>> > > > > >>>>> On Mon, Nov 22, 2021 at 11:45 PM Ingo Bürk > > > > > > > > wrote: > > > > >>>>>> Hi, > > > > >>>>>> > > > > >>>>>> also a +1 from me because of everything Chesnay already said. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Ingo > > > > >>>>>> > > > > >>>>>> On Mon, Nov 22, 2021 at 4:41 PM Martijn Visser < > > > > >>> > > > > >>> mart...@ververica.com> -- Dr. Nico Kruber | Solutions Architect Follow us @VervericaData Ververica -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
[jira] [Created] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached
Nico Kruber created FLINK-25027: --- Summary: Allow GC of a finished job's JobMaster before the slot timeout is reached Key: FLINK-25027 URL: https://issues.apache.org/jira/browse/FLINK-25027 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Nico Kruber Attachments: image-2021-11-23-20-32-20-479.png In a session cluster, after a (batch) job is finished, the JobMaster seems to stick around for another couple of minutes before being eligible for garbage collection. Looking into a heap dump, it seems to be tied to a {{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying Akka executor (and keeps the JM from being GC’d). Per default the action is scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks [~trohrmann] for helping out here) !image-2021-11-23-20-32-20-479.png! With this setting, you will have to account for enough metaspace to cover 5 minutes of time which may span a couple of jobs, needlessly! The problem seems to be that Flink is using the main thread executor for the scheduling that uses the {{ActorSystem}}'s scheduler and the future task scheduled with Akka can (probably) not be easily cancelled. One idea could be to use a dedicated thread pool per JM, that we shut down when the JM terminates. That way we would not keep the JM from being GC’d. (The concrete example we investigated was a DataSet job) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25023) ClassLoader leak on JM/TM through indirectly-started Hadoop threads out of user code
Nico Kruber created FLINK-25023: --- Summary: ClassLoader leak on JM/TM through indirectly-started Hadoop threads out of user code Key: FLINK-25023 URL: https://issues.apache.org/jira/browse/FLINK-25023 Project: Flink Issue Type: Bug Components: Connectors / FileSystem, Connectors / Hadoop Compatibility, FileSystems Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Nico Kruber If a Flink job is using HDFS through Flink's filesystem abstraction (either on the JM or TM), that code may actually spawn a few threads, e.g. from static class members: * {{org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner}} * {{IPC Parameter Sending Thread#*}} These threads are started as soon as the classes are loaded which may be in the context of the user code. In this specific scenario, however, the created threads may contain references to the context class loader (I did not see that though) or, as happened here, it may inherit thread contexts such as the {{ProtectionDomain}} (from an {{{}AccessController{}}}). Hence user contexts and user class loaders are leaked into long-running threads that are run in Flink's (parent) classloader. Fortunately, it seems to only *leak a single* {{ChildFirstClassLoader}} in this concrete example but that may depend on which code paths each client execution is walking. A *proper solution* doesn't seem so simple: * We could try to proactively initialize available file systems in the hope to start all threads in the parent classloader with parent context. * We could create a default {{ProtectionDomain}} for spawned threads as discussed at [https://dzone.com/articles/javalangoutofmemory-permgen], however, the {{StatisticsDataReferenceCleaner}} isn't actually actively spawned from any callback but as a static variable and this with the class loading itself (but maybe this is still possible somehow). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25022) ClassLoader leak with ThreadLocals on the JM when submitting a job through the REST API
Nico Kruber created FLINK-25022: --- Summary: ClassLoader leak with ThreadLocals on the JM when submitting a job through the REST API Key: FLINK-25022 URL: https://issues.apache.org/jira/browse/FLINK-25022 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Nico Kruber If a job is submitted using the REST API's {{/jars/:jarid/run}} endpoint, user code has to be executed on the JobManager and it is doing this in a couple of (pooled) dispatcher threads like {{{}Flink-DispatcherRestEndpoint-thread-*{}}}. If the user code is using thread locals (and not cleaning them up), they may remain in the thread with references to the {{ChildFirstClassloader}} of the job and thus leaking that. We saw this for the {{jsoniter}} scala library at the JM which [creates ThreadLocal instances|https://github.com/plokhotnyuk/jsoniter-scala/blob/95c7053cfaa558877911f3448382f10d53c4fcbf/jsoniter-scala-core/jvm/src/main/scala/com/github/plokhotnyuk/jsoniter_scala/core/package.scala] but doesn't remove them, but it can actually happen with any user code or (worse) library used in user code. There are a few *workarounds* a user can use, e.g. putting the library in Flink's lib/ folder or submitting via the Flink CLI, but these may actually not be possible to use, depending on the circumstances. A *proper fix* should happen in Flink by guarding against any of these things in the dispatcher threads. We could, for example, spawn a separate thread for executing the user's {{main()}} method and once the job is submitted exit that thread and destroy all thread locals along with it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24769) FlameGraphs in 1.14 do not aggregate subtasks' stack traces anymore
Nico Kruber created FLINK-24769: --- Summary: FlameGraphs in 1.14 do not aggregate subtasks' stack traces anymore Key: FLINK-24769 URL: https://issues.apache.org/jira/browse/FLINK-24769 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.14.0 Reporter: Nico Kruber Attachments: image-2021-11-04-12-59-24-308.png Since Flink 1.14.0, after enabling FlameGraphs and gathering statistics for a task, it doesn't aggregate the results from the parallel instances anymore but instead shows each individual one - something that easily gets too messy for higher parallelism. It seems the last shared method on the stack is {{Task.runWithSystemExitMonitoring}} and then it spawns off into individual lambda functions: !image-2021-11-04-12-59-24-308.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24411) Add docs navigation for release notes
Nico Kruber created FLINK-24411: --- Summary: Add docs navigation for release notes Key: FLINK-24411 URL: https://issues.apache.org/jira/browse/FLINK-24411 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.13.2, 1.12.5, 1.14.0, 1.11.4 Reporter: Nico Kruber I propose to add a "Release Notes" section into the documentation's navigation bar for things like https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/ At the moment, I feel a bit lost in the navigation when viewing that page (which is only linked from the docs home page which I barely ever look at). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24144) Improve DataGenerator to prevent excessive creation of new Random objects
Nico Kruber created FLINK-24144: --- Summary: Improve DataGenerator to prevent excessive creation of new Random objects Key: FLINK-24144 URL: https://issues.apache.org/jira/browse/FLINK-24144 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Affects Versions: 1.13.2, 1.14.0 Reporter: Nico Kruber Assignee: Nico Kruber For a couple of methods in {{DataGenerator}}, new {{Random}} objects are created with a specific seed. Instead, we could create a single {{Random}} object and reset the seed when needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24115) Outdated SQL Temporal Join Example
Nico Kruber created FLINK-24115: --- Summary: Outdated SQL Temporal Join Example Key: FLINK-24115 URL: https://issues.apache.org/jira/browse/FLINK-24115 Project: Flink Issue Type: Bug Components: Documentation, Table SQL / API Reporter: Nico Kruber [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#event-time-temporal-join] is missing a primary key in the Table DDL. Also, the following note does not map the current example anymore: {quote} Note: The event-time temporal join requires the primary key contained in the equivalence condition of the temporal join condition, e.g., The primary key P.product_id of table product_changelog to be constrained in the condition O.product_id = P.product_id. {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24022) Re-Enable Scala checks in flink-training CI
Nico Kruber created FLINK-24022: --- Summary: Re-Enable Scala checks in flink-training CI Key: FLINK-24022 URL: https://issues.apache.org/jira/browse/FLINK-24022 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.14.0, 1.13.3 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 FLINK-23339 disabled Scala by default but therefore also disabled CI for newly checked-in changes on the Scala code. We should run CI with Scala enabled -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24016) Restore 1.12 SQL docs page on state retention
Nico Kruber created FLINK-24016: --- Summary: Restore 1.12 SQL docs page on state retention Key: FLINK-24016 URL: https://issues.apache.org/jira/browse/FLINK-24016 Project: Flink Issue Type: Improvement Components: Documentation, Table SQL / API Affects Versions: 1.13.2, 1.14.0 Reporter: Nico Kruber {color:#1d1c1d}It seems that the whole {color}[section about state retention from the docs|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/query_configuration.html#idle-state-retention-time]{color:#1d1c1d} in Flink 1.12 vanished with Flink 1.13. It was outdated with these min/max settings but instead of updating it, it was just removed and state retention/TTL is now safely hidden in [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-state-ttl]{color} {color:#1d1c1d}The discussion in the 1.12 docs is, however, superior since it explains a bit more why we need it and the types of queries that need it. I therefore propose to restore it somewhere in the docs. {color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24008) Support state cleanup based on unique keys
Nico Kruber created FLINK-24008: --- Summary: Support state cleanup based on unique keys Key: FLINK-24008 URL: https://issues.apache.org/jira/browse/FLINK-24008 Project: Flink Issue Type: New Feature Components: Table SQL / Runtime Affects Versions: 1.14.0 Reporter: Nico Kruber In a join of two tables where we join on unique columns, e.g. from primary keys, we could clean up join state more pro-actively since we now whether future joins with this row are still possible (assuming uniqueness of that key). While this may not solve all issues of growing state in non-time-based joins it may still considerably reduce state size, depending on the involved columns. This would add one more way of expiring state that the operator stores; currently there are only these * time-based joins, e.g. interval join * idle state retention via \{{TableConfig#setIdleStateRetention()}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23993) Describe eventually-consistency of materializing upserts
Nico Kruber created FLINK-23993: --- Summary: Describe eventually-consistency of materializing upserts Key: FLINK-23993 URL: https://issues.apache.org/jira/browse/FLINK-23993 Project: Flink Issue Type: Sub-task Components: Documentation, Table SQL / Ecosystem Affects Versions: 1.14.0 Reporter: Nico Kruber FLINK-20374 added an upsert materialization operator which fixes the order of shuffled streams. The results of this operator are actually _eventually consistent_ (it collects the latest value it has seen and redacts older versions when these are not valid anymore). You could see a result stream like this, based on the order the materialization receives events: +I10, -I10, +I5, -I5, +I10, -I10, +I3, -I3, +I10 Each time, the value stored in Kafka would change until the "final" result is in. It may be acceptable for upsert sinks, but should be documented (or changed/fixed) nonetheless. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23839) Unclear severity of Kafka transaction recommit warning in logs
Nico Kruber created FLINK-23839: --- Summary: Unclear severity of Kafka transaction recommit warning in logs Key: FLINK-23839 URL: https://issues.apache.org/jira/browse/FLINK-23839 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka Affects Versions: 1.13.2, 1.12.5, 1.11.4 Reporter: Nico Kruber In a transactional Kafka sink, after recovery, all transactions from the recovered checkpoint are recommitted even though they may have already been committed before because this is not part of the checkpoint. This second commit can lead to a number of WARN entries in the logs coming from [KafkaCommitter|https://github.com/apache/flink/blob/6c9818323b41a84137c52822d2993df788dbc9bb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java#L66] or [FlinkKafkaProducer|https://github.com/apache/flink/blob/6c9818323b41a84137c52822d2993df788dbc9bb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1057]. Examples: {code} WARN [org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer] ... Encountered error org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. while recovering transaction KafkaTransactionState [transactionalId=..., producerId=12345, epoch=123]. Presumably this transaction has been already committed before. WARN [org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer] ... Encountered error org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. while recovering transaction KafkaTransactionState [transactionalId=..., producerId=12345, epoch=12345]. Presumably this transaction has been already committed before {code} It sounds to me like the second exception is useful and indicates that the transaction timeout is too short. The first exception, however, seems superfluous and rather alerts the user more than it helps. Or what would you do with it? Can we instead filter out superfluous exceptions and at least put these onto DEBUG logs instead? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23812) Support configuration of the RocksDB logging via configuration
Nico Kruber created FLINK-23812: --- Summary: Support configuration of the RocksDB logging via configuration Key: FLINK-23812 URL: https://issues.apache.org/jira/browse/FLINK-23812 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.13.2 Reporter: Nico Kruber Assignee: Nico Kruber Since FLINK-14482 has been merged now, we should also allow users to configure more than just the log level (FLINK-20911) but also the following parameters so that they can safely enable RocksDB logging again by using a rolling logger, for example: - max log file size via {{state.backend.rocksdb.log.max-file-size}} - logging files to keep via {{state.backend.rocksdb.log.file-num}} - log directory {{state.backend.rocksdb.log.dir}}, e.g. to put these logs onto a (separate) volume that may not be local and is retained after container shutdown for debugging purposes -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23670) Add Scala formatting checks to training exercises
Nico Kruber created FLINK-23670: --- Summary: Add Scala formatting checks to training exercises Key: FLINK-23670 URL: https://issues.apache.org/jira/browse/FLINK-23670 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.13.2 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.13.3 Currently, there are no formatting checks for Scala code in the training exercises. We should employ the same checks that the main Flink project is using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23669) Avoid using Scala >= 2.12.8 in Flink Training exercises
Nico Kruber created FLINK-23669: --- Summary: Avoid using Scala >= 2.12.8 in Flink Training exercises Key: FLINK-23669 URL: https://issues.apache.org/jira/browse/FLINK-23669 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.13.2 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.13.3 The current IDE setup instructions of the Flink training exercises do not mention a specific Scala SDK to set up. For compatibility reasons described in https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/datastream/project-configuration/#scala-versions, we should also not use 2.12.8 and up. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23667) Fix training exercises IDE setup description for Scala
Nico Kruber created FLINK-23667: --- Summary: Fix training exercises IDE setup description for Scala Key: FLINK-23667 URL: https://issues.apache.org/jira/browse/FLINK-23667 Project: Flink Issue Type: Bug Components: Documentation / Training / Exercises Affects Versions: 1.13.2 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.13.3 If you follow the training exercises instructions to set up your IDE with code formatting and the Save Actions plugin while having Scala enabled, it will completely reformat your Scala code files instead of keeping them as is. The instructions should be updated to match the ones used for the Flink main project. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [ANNOUNCE] RocksDB Version Upgrade and Performance
That's actually also what I'm seeing most of the time and what I'd expect to improve with the newer RocksDB version. Hence, I'd also favour the upgrade even if there is a slight catch with respect to performance - we should, however, continue to investigate this together with the RocksDB community. Nico On Wednesday, 4 August 2021 14:26:32 CEST David Anderson wrote: > I am hearing quite often from users who are struggling to manage memory > usage, and these are all users using RocksDB. While I don't know for > certain that RocksDB is the cause in every case, from my perspective, > getting the better memory stability of version 6.20 in place is critical. > > Regards, > David > > On Wed, Aug 4, 2021 at 8:08 AM Stephan Ewen wrote: > > Hi all! > > > > *!!! If you are a big user of the Embedded RocksDB State Backend and have > > performance sensitive workloads, please read this !!!* > > > > I want to quickly raise some awareness for a RocksDB version upgrade we > > plan to do, and some possible impact on application performance. > > > > *We plan to upgrade RocksDB to version 6.20.* That version of RocksDB > > unfortunately introduces some non-trivial performance regression. In our > > Nexmark Benchmark, at least one query is up to 13% slower. > > With some fixes, this can be improved, but even then there is an overall > > *regression up to 6% in some queries*. (See attached table for results > > from relevant Nexmark Benchmark queries). > > > > We would do this update nonetheless, because we need to get new features > > and bugfixes from RocksDB in. > > > > Please respond to this mail thread if you have major concerns about this. > > > > > > *### Fallback Plan* > > > > Optionally, we could fall back to Plan B, which is to upgrade RocksDB only > > to version 5.18.4. > > Which has no performance regression (after applying a custom patch). > > > > While this spares us the performance degradation of RocksDB 6.20.x, this > > > > has multiple disadvantages: > > - Does not include the better memory stability (strict cache control) > > - Misses out on some new features which some users asked about > > - Does not have the latest RocksDB bugfixes > > > > The latest point is especially bad in my opinion. While we can cherry-pick > > some bugfixes back (and have done this in the past), users typically run > > into an issue first and need to trace it back to RocksDB, then one of the > > committers can find the relevant patch from RocksDB master and backport > > it. > > That isn't the greatest user experience. > > > > Because of those disadvantages, we would prefer to do the upgrade to the > > newer RocksDB version despite the unfortunate performance regression. > > > > Best, > > Stephan -- Dr. Nico Kruber | Solutions Architect Follow us @VervericaData Ververica -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
[jira] [Created] (FLINK-23340) Improve development instructions for flink-training exercises
Nico Kruber created FLINK-23340: --- Summary: Improve development instructions for flink-training exercises Key: FLINK-23340 URL: https://issues.apache.org/jira/browse/FLINK-23340 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23339) Disable flink-training exercises in Scala by default
Nico Kruber created FLINK-23339: --- Summary: Disable flink-training exercises in Scala by default Key: FLINK-23339 URL: https://issues.apache.org/jira/browse/FLINK-23339 Project: Flink Issue Type: Sub-task Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 At various occasions during training held by us, people who were not developing in Scala have reported issues with the current setup. If we make the Scala exercises optional, that should help reducing friction for the others. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23338) Use Spotless for flink-training exercises as well
Nico Kruber created FLINK-23338: --- Summary: Use Spotless for flink-training exercises as well Key: FLINK-23338 URL: https://issues.apache.org/jira/browse/FLINK-23338 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23337) Properly use the 'shadow' plugin and remove flinkShadowJar
Nico Kruber created FLINK-23337: --- Summary: Properly use the 'shadow' plugin and remove flinkShadowJar Key: FLINK-23337 URL: https://issues.apache.org/jira/browse/FLINK-23337 Project: Flink Issue Type: Sub-task Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0, 1.13.3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23336) Use the same log4j version as in Flink 1.13
Nico Kruber created FLINK-23336: --- Summary: Use the same log4j version as in Flink 1.13 Key: FLINK-23336 URL: https://issues.apache.org/jira/browse/FLINK-23336 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23335) Add a separate 'runSolution' gradle task
Nico Kruber created FLINK-23335: --- Summary: Add a separate 'runSolution' gradle task Key: FLINK-23335 URL: https://issues.apache.org/jira/browse/FLINK-23335 Project: Flink Issue Type: Sub-task Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23334) Move 'application' implementation decision to subprojects
Nico Kruber created FLINK-23334: --- Summary: Move 'application' implementation decision to subprojects Key: FLINK-23334 URL: https://issues.apache.org/jira/browse/FLINK-23334 Project: Flink Issue Type: Sub-task Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23333) Improve gradle setup for flink-training exercises
Nico Kruber created FLINK-2: --- Summary: Improve gradle setup for flink-training exercises Key: FLINK-2 URL: https://issues.apache.org/jira/browse/FLINK-2 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23332) Update flink-training exercises gradle version
Nico Kruber created FLINK-23332: --- Summary: Update flink-training exercises gradle version Key: FLINK-23332 URL: https://issues.apache.org/jira/browse/FLINK-23332 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23331) FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on Azure
Nico Kruber created FLINK-23331: --- Summary: FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on Azure Key: FLINK-23331 URL: https://issues.apache.org/jira/browse/FLINK-23331 Project: Flink Issue Type: Bug Affects Versions: 1.14.0 Reporter: Nico Kruber https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20223=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0=4767 {code} Jul 09 09:24:32 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 3.266 s <<< FAILURE! - in org.apache.flink.test.streaming.api.FileReadingWatermarkITCase Jul 09 09:24:32 [ERROR] testWatermarkEmissionWithChaining(org.apache.flink.test.streaming.api.FileReadingWatermarkITCase) Time elapsed: 3.191 s <<< FAILURE! Jul 09 09:24:32 java.lang.AssertionError: too few watermarks emitted: 4 Jul 09 09:24:32 at org.junit.Assert.fail(Assert.java:89) Jul 09 09:24:32 at org.junit.Assert.assertTrue(Assert.java:42) Jul 09 09:24:32 at org.apache.flink.test.streaming.api.FileReadingWatermarkITCase.testWatermarkEmissionWithChaining(FileReadingWatermarkITCase.java:66) Jul 09 09:24:32 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Jul 09 09:24:32 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Jul 09 09:24:32 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Jul 09 09:24:32 at java.lang.reflect.Method.invoke(Method.java:498) Jul 09 09:24:32 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Jul 09 09:24:32 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Jul 09 09:24:32 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Jul 09 09:24:32 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Jul 09 09:24:32 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Jul 09 09:24:32 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Jul 09 09:24:32 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Jul 09 09:24:32 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Jul 09 09:24:32 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Jul 09 09:24:32 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Jul 09 09:24:32 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Jul 09 09:24:32 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Jul 09 09:24:32 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Jul 09 09:24:32 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Jul 09 09:24:32 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Jul 09 09:24:32 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Jul 09 09:24:32 at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) Jul 09 09:24:32 at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) Jul 09 09:24:32 at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) Jul 09 09:24:32 at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) Jul 09 09:24:32 at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) Jul 09 09:24:32 at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) Jul 09 09:24:32 at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) Jul 09 09:24:32 at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) Jul 09 09:24:32 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23312) Use -Dfast for building e2e tests on AZP
Nico Kruber created FLINK-23312: --- Summary: Use -Dfast for building e2e tests on AZP Key: FLINK-23312 URL: https://issues.apache.org/jira/browse/FLINK-23312 Project: Flink Issue Type: Improvement Components: Test Infrastructure Affects Versions: 1.13.1 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 The "e2e" builder in Azure pipelines builds Flink again on top of what the "compile" builder is already doing. This unnecessary duplicates a couple of checks that are enough to execute once and can be skipped via providing {{-Dfast}}. On my local machine with 32GB RAM, 8 physical cores and a fast NVMe SSD, the difference is pretty big: {code} time mvn clean install -Dscala-2.12 -DskipTests -pl flink-dist -am # -> 6:40 min time mvn clean install -Dscala-2.12 -DskipTests -Dfast -pl flink-dist -am # -> 5:40 min {code} Therefore, I'm proposing to add this parameter to the "e2e" builder's compile step. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23311) Improve PojoSerializer test
Nico Kruber created FLINK-23311: --- Summary: Improve PojoSerializer test Key: FLINK-23311 URL: https://issues.apache.org/jira/browse/FLINK-23311 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.13.1 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.14.0 While working with the PojoSerializer a bit more, I noticed a couple of minor things that are off in the current tests: - the test Pojo does not take {{dumm5}} into account for {{hashCode}} and {{equals}} - error messages are not so nice (and mix up the order of expected and actual values) I'll create a PR for fixing these things in one go under this ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23102) Accessing FlameGraphs while not being enabled returns an exception
Nico Kruber created FLINK-23102: --- Summary: Accessing FlameGraphs while not being enabled returns an exception Key: FLINK-23102 URL: https://issues.apache.org/jira/browse/FLINK-23102 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.13.1 Reporter: Nico Kruber Attachments: image-2021-06-22-17-36-47-730.png Trying to retrieve the FlameGraph in a job that doesn't have it enabled returns this ugly exception: !image-2021-06-22-17-36-47-730.png! Instead, it could mention that this feature is not enabled and describe how to enable it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23101) Flame Graphs initial view says it is 18800 days in the past
Nico Kruber created FLINK-23101: --- Summary: Flame Graphs initial view says it is 18800 days in the past Key: FLINK-23101 URL: https://issues.apache.org/jira/browse/FLINK-23101 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Reporter: Nico Kruber Attachments: image.png When you look at the Flame Graphs for a task for the first time, it will show an empty space and say that the measurement was ~18800 days in the past (see the attached image). This should rather be something more useful like "no measurement yet" or so... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22699) Make ConstantArgumentCount public API
Nico Kruber created FLINK-22699: --- Summary: Make ConstantArgumentCount public API Key: FLINK-22699 URL: https://issues.apache.org/jira/browse/FLINK-22699 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.13.0 Reporter: Nico Kruber {{ConstantArgumentCount}} is quite useful when implementing custom type inference. While the user can, of course, implement an {{ArgumentCount}} as well with just a few methods, it feels like this one is the most commonly used implementation and could be provided as public API (currently, it's marked {{@Internal}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22405) Support fixed-lengh chars in the LeadLag built-in function
Nico Kruber created FLINK-22405: --- Summary: Support fixed-lengh chars in the LeadLag built-in function Key: FLINK-22405 URL: https://issues.apache.org/jira/browse/FLINK-22405 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.2 Reporter: Nico Kruber LeadLag aggregate function does not support type: ''CHAR'', as in the following example (a CAST to VARCHAR works around this). Technically, there should be no reason though to support STRING/VARCHAR but not CHAR: {code:sql} CREATE TEMPORARY VIEW test_cardinality AS SELECT * FROM ( VALUES ('Alice', 'al...@test.com', ARRAY [ 'al...@test.com' ], 'Test Ltd'), ('Alice', 'al...@test.com', ARRAY [ 'al...@test.com' ], 'Test Ltd'), ('Alice', 'al...@test2.com', ARRAY [ 'al...@test.com', 'al...@test2.com' ], 'Test Ltd')) AS t ( name, email, aliases, company ); {code} {code:sql} SELECT name, LEAD(company, 0) AS company FROM test_cardinality WHERE CARDINALITY(aliases) >= 2 GROUP BY name; {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21584) Support UNNEST in LEFT JOINs
Nico Kruber created FLINK-21584: --- Summary: Support UNNEST in LEFT JOINs Key: FLINK-21584 URL: https://issues.apache.org/jira/browse/FLINK-21584 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Nico Kruber Currently, UNNEST (for arrays and maps) is only supported in CROSS JOIN operations, but you may actually also want this in a LEFT JOIN fashion in which case you would get {{NULL}} values for the expanded fields. h1. Example {code:sql} CREATE TEMPORARY VIEW input ( f1, f2 ) AS VALUES ('A', STR_TO_MAP('')), ('B', STR_TO_MAP('1, 2')); SELECT * FROM input LEFT JOIN UNNEST(f2); {code} h1. Current workaround {code:sql} CREATE TEMPORARY VIEW input ( f1, f2 ) AS VALUES ('A', STR_TO_MAP('')), ('B', STR_TO_MAP('1, 2')); SELECT * FROM input CROSS JOIN UNNEST(f2) UNION ALL SELECT *, NULLIF('1', '1') AS `KEY`, NULLIF('1', '1') as `VALUE` FROM input WHERE f2 IS NULL OR CARDINALITY(f2) = 0; {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21583) Allow comments in CSV format without having to ignore parse errors
Nico Kruber created FLINK-21583: --- Summary: Allow comments in CSV format without having to ignore parse errors Key: FLINK-21583 URL: https://issues.apache.org/jira/browse/FLINK-21583 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem Affects Versions: 1.12.1 Reporter: Nico Kruber Currently, when you pass {{'csv.allow-comments' = 'true'}} to a table definition, you also have to set {{'csv.ignore-parse-errors' = 'true'}} to actually skip the commented-out line (and the docs mention this prominently as well). This, however, may mask actual parsing errors that you want to be notified of. I would like to propose that {{allow-comments}} actually also skips the commented-out lines automatically because these shouldn't be used anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21569) Flink SQL with CSV file input job hangs
Nico Kruber created FLINK-21569: --- Summary: Flink SQL with CSV file input job hangs Key: FLINK-21569 URL: https://issues.apache.org/jira/browse/FLINK-21569 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Runtime Affects Versions: 1.12.1 Reporter: Nico Kruber Attachments: airports.csv, flights-small2.csv In extension to FLINK-21567, I actually also got the job to be stuck on cancellation by doing the following in the SQL client: * configure SQL client defaults to run with parallelism 2 * execute the following statement {code} CREATE TABLE `airports` ( `IATA_CODE` CHAR(3), `AIRPORT` STRING, `CITY` STRING, `STATE` CHAR(2), `COUNTRY` CHAR(3), `LATITUDE` DOUBLE NULL, `LONGITUDE` DOUBLE NULL, PRIMARY KEY (`IATA_CODE`) NOT ENFORCED ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///tmp/kaggle-flight-delay/airports.csv', 'format' = 'csv', 'csv.allow-comments' = 'true', 'csv.ignore-parse-errors' = 'true', 'csv.null-literal' = '' ); CREATE TABLE `flights` ( `_YEAR` CHAR(4), `_MONTH` CHAR(2), `_DAY` CHAR(2), `_DAY_OF_WEEK` TINYINT, `AIRLINE` CHAR(2), `FLIGHT_NUMBER` SMALLINT, `TAIL_NUMBER` CHAR(6), `ORIGIN_AIRPORT` CHAR(3), `DESTINATION_AIRPORT` CHAR(3), `_SCHEDULED_DEPARTURE` CHAR(4), `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'), `_DEPARTURE_TIME` CHAR(4), `DEPARTURE_DELAY` SMALLINT, `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')), `TAXI_OUT` SMALLINT, `WHEELS_OFF` CHAR(4), `SCHEDULED_TIME` SMALLINT, `ELAPSED_TIME` SMALLINT, `AIR_TIME` SMALLINT, `DISTANCE` SMALLINT, `WHEELS_ON` CHAR(4), `TAXI_IN` SMALLINT, `SCHEDULED_ARRIVAL` CHAR(4), `ARRIVAL_TIME` CHAR(4), `ARRIVAL_DELAY` SMALLINT, `DIVERTED` BOOLEAN, `CANCELLED` BOOLEAN, `CANCELLATION_REASON` CHAR(1), `AIR_SYSTEM_DELAY` SMALLINT, `SECURITY_DELAY` SMALLINT, `AIRLINE_DELAY` SMALLINT, `LATE_AIRCRAFT_DELAY` SMALLINT, `WEATHER_DELAY` SMALLINT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///tmp/kaggle-flight-delay/flights-small2.csv', 'format' = 'csv', 'csv.null-literal' = '' ); SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, `NUM_DELAYS` FROM ( SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, COUNT(*) AS `NUM_DELAYS`, ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) AS rownum FROM flights, airports WHERE `ORIGIN_AIRPORT` = `IATA_CODE` AND `DEPARTURE_DELAY` > 0 GROUP BY `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`) WHERE rownum <= 10; {code} Results are shown in the CLI but after quitting the result view, the job seems stuck in CANCELLING until (at least) one of the TMs shuts itself down because a task wouldn't react to the cancelling signal. This appears in its TM logs: {code} 2021-03-02 18:39:19,451 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: TableSourceScan(table=[[default_catalog, default_database, airports, project=[IATA_CODE, AIRPORT, STATE]]], fields=[IATA_CODE, AIRPORT, STATE]) (2/2)#0' did not react to cancelling signal for 30 seconds, but is stuck in method: sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:653) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) java.lang.Thread.run(Thread.java:748) ... 2021-03-02 18:39:49,447 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds. org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.12-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] 2021-03-02 18:39:49,448 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down... org.apache.flink.util.FlinkRuntimeException: Task did no
[jira] [Created] (FLINK-21568) Navigating in SQL client can lead to SqlExecutionException
Nico Kruber created FLINK-21568: --- Summary: Navigating in SQL client can lead to SqlExecutionException Key: FLINK-21568 URL: https://issues.apache.org/jira/browse/FLINK-21568 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.12.1 Reporter: Nico Kruber Pressing 'p' in the SQL CLI's result browser before any result is available will result in the following exception being thrown: {code} 2021-03-02 18:27:05,153 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalid page '1'. at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.retrievePage(MaterializedCollectStreamResult.java:177) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.gateway.local.LocalExecutor.retrieveResultPage(LocalExecutor.java:415) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliTableResultView.updatePage(CliTableResultView.java:293) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliTableResultView.gotoPreviousPage(CliTableResultView.java:381) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliTableResultView.evaluate(CliTableResultView.java:183) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliTableResultView.evaluate(CliTableResultView.java:50) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliView.open(CliView.java:125) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:675) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:323) ~[flink-sql-client_2.12-1.12.1.jar:1.12.1] at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_282] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214) [flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144) [flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:115) [flink-sql-client_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.12-1.12.1.jar:1.12.1] {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21567) CSV Format exception while parsing: ArrayIndexOutOfBoundsException: 4000
Nico Kruber created FLINK-21567: --- Summary: CSV Format exception while parsing: ArrayIndexOutOfBoundsException: 4000 Key: FLINK-21567 URL: https://issues.apache.org/jira/browse/FLINK-21567 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.12.1, 1.11.3 Reporter: Nico Kruber Attachments: flights-small.csv I've been trying to play a bit with the data available at https://www.kaggle.com/usdot/flight-delays and got the following exception: {code} 2021-02-16 18:57:37,913 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, flights, filter=[], project=[ORIGIN_AIRPORT, DEPARTURE_DELAY]]], fields=[ORIGIN_AIRPORT, DEPARTURE_DELAY]) -> Calc(select=[ORIGIN_AIRPORT], where=[(DEPARTURE_DELAY > 0)]) -> LocalHashAggregate(groupBy=[ORIGIN_AIRPORT], select=[ORIGIN_AIRPORT, Partial_COUNT(*) AS count1$0]) (1/1)#0 (ebbf1204d875a5a4ace529df0d5ba719) switched from RUNNING to FAILED. java.io.IOException: Failed to deserialize CSV row. at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1] Caused by: java.lang.ArrayIndexOutOfBoundsException: 4000 at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.skipLinesWhenNeeded(CsvDecoder.java:527) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.startNewLine(CsvDecoder.java:499) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleObjectRowEnd(CsvParser.java:1067) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:858) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:665) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:250) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:280) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:250) ~[flink-csv-1.12.1.jar:1.12.1] ... 5 more {code} h1. Fully working example: Using the attached file (derived from the data on flight delays, linked above) and the SQL CLI: {code} CREATE TABLE `flights` ( `_YEAR` CHAR(4), `_MONTH` CHAR(2), `_DAY` CHAR(2), `_DAY_OF_WEEK` TINYINT, `AIRLINE` CHAR(2), `FLIGHT_NUMBER` SMALLINT, `TAIL_NUMBER` CHAR(6), `ORIGIN_AIRPORT` CHAR(3), `DESTINATION_AIRPORT` CHAR(3), `_SCHEDULED_DEPARTURE` CHAR(4), `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'), `_DEPARTURE_TIME` CHAR(4), `DEPARTURE_DELAY` SMALLINT, `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')), `TAXI_OUT` SMALLINT, `WHEELS_OFF` CHAR(4), `SCHEDULED_TIME` SMALLINT, `ELA
[jira] [Created] (FLINK-21566) Improve error message for "Unsupported casting"
Nico Kruber created FLINK-21566: --- Summary: Improve error message for "Unsupported casting" Key: FLINK-21566 URL: https://issues.apache.org/jira/browse/FLINK-21566 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Nico Kruber In a situation like from FLINK-21565, neither the error message {{Unsupported casting from TINYINT to INTERVAL SECOND(3)}}, nor the exception trace (see FLINK-21565) gives you a good hint on where the error is, especially if you have many statements with TINYINTs or operations on these. The query parser could highlight the location of the error inside the SQL statement that the user provided. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21565) Support more integer types in TIMESTAMPADD
Nico Kruber created FLINK-21565: --- Summary: Support more integer types in TIMESTAMPADD Key: FLINK-21565 URL: https://issues.apache.org/jira/browse/FLINK-21565 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Nico Kruber At the moment, {{TIMESTAMPADD}} does not seem to support {{SMALLINT}} or {{TINYINT}} types which should be perfectly suitable for auto-conversion (in contrast to BIGINT or floating numbers where I would expect the user to cast it appropriately). It currently fails with the following exception: {code} org.apache.flink.table.planner.codegen.CodeGenException: Unsupported casting from TINYINT to INTERVAL SECOND(3). at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.numericCasting(ScalarOperatorGens.scala:2352) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateBinaryArithmeticOperator(ScalarOperatorGens.scala:93) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:590) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:529) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[flink-table_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143) ~[flink-table-blink_2.12-1.12.1.jar:1.12.1] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink
[jira] [Created] (FLINK-21563) Support using computed columns when defining (new) computed columns
Nico Kruber created FLINK-21563: --- Summary: Support using computed columns when defining (new) computed columns Key: FLINK-21563 URL: https://issues.apache.org/jira/browse/FLINK-21563 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.11.3 Reporter: Nico Kruber To avoid code duplications, it would be nice to be able to use computed columns in later computations of new computed columns, e.g. {code} CREATE TABLE `flights` ( `_YEAR` CHAR(4), `_MONTH` CHAR(2), `_DAY` CHAR(2), `_SCHEDULED_DEPARTURE` CHAR(4), `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'), `_DEPARTURE_TIME` CHAR(4), `DEPARTURE_DELAY` SMALLINT, `DEPARTURE_TIME` AS SCHEDULED_DEPARTURE + DEPARTURE_DELAY )... {code} Otherwise, a user would have to repeat these calculations over and over again which is not that maintainable. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21562) Add more informative message on CSV parsing errors
Nico Kruber created FLINK-21562: --- Summary: Add more informative message on CSV parsing errors Key: FLINK-21562 URL: https://issues.apache.org/jira/browse/FLINK-21562 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / API Affects Versions: 1.11.3 Reporter: Nico Kruber I was parsing a CSV file with comments in it and used {{'csv.allow-comments' = 'true'}} without also passing {{'csv.ignore-parse-errors' = 'true'}} to the table DDL to not hide any actual format errors. Since I didn't just have strings in my table, this did of course stumble on the commented-out line with the following error: {code} 2021-02-16 17:45:53,055 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, airports]], fields=[IATA_CODE, AIRPORT, CITY, STATE, COUNTRY, LATITUDE, LONGITUDE]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink (1/1)#0 (9f3a3965f18ed99ee42580bdb559ba66) switched from RUNNING to FAILED. java.io.IOException: Failed to deserialize CSV row. at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1] Caused by: java.lang.NumberFormatException: empty String at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) ~[?:1.8.0_275] at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) ~[?:1.8.0_275] at java.lang.Double.parseDouble(Double.java:538) ~[?:1.8.0_275] at org.apache.flink.formats.csv.CsvToRowDataConverters.convertToDouble(CsvToRowDataConverters.java:203) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createNullableConverter$ac6e531e$1(CsvToRowDataConverters.java:113) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createRowConverter$18bb1dd$1(CsvToRowDataConverters.java:98) ~[flink-csv-1.12.1.jar:1.12.1] at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:251) ~[flink-csv-1.12.1.jar:1.12.1] ... 5 more {code} Two things should be improved here: # commented-out lines should be ignored by default (potentially, FLINK-17133 addresses this or at least gives the user the power to do so) # the error message itself is not very informative: "empty String". This ticket is about the latter. I would suggest to have at least a few more pointers to direct the user to finding the source in the CSV file/item/... - here, the data type could just be wrong or the CSV file itself may be wrong/corrupted and the user would need to investigate. What exactly may help here, probably depends on the actual input connector this format is currently working with, e.g. line number in a csv file would be best, otherwise that may not be possible but we could show the whole line or at least a few surrounding fields... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20886) Add the option to get a threaddump on checkpoint timeouts
Nico Kruber created FLINK-20886: --- Summary: Add the option to get a threaddump on checkpoint timeouts Key: FLINK-20886 URL: https://issues.apache.org/jira/browse/FLINK-20886 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.12.0 Reporter: Nico Kruber For debugging checkpoint timeouts, I was thinking about the following addition to Flink: When a checkpoint times out and the async thread is still running, create a threaddump [1] and either add this to the checkpoint stats, log it, or write it out. This may help identifying where the checkpoint is stuck (maybe a lock, could also be in a third party lib like the FS connectors,...). It would give us some insights into what the thread is currently doing. Limiting the scope of the threads would be nice but may not be possible in the general case since additional threads (spawned by the FS connector lib, or otherwise connected) may interact with the async thread(s) by e.g. going through the same locks. [1] https://crunchify.com/how-to-generate-java-thread-dump-programmatically/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20674) Wrong send/received stats with UNION ALL
Nico Kruber created FLINK-20674: --- Summary: Wrong send/received stats with UNION ALL Key: FLINK-20674 URL: https://issues.apache.org/jira/browse/FLINK-20674 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.3, 1.12.0 Reporter: Nico Kruber When using {{UNION ALL}} to union the same table twice , the number of records and bytes sent is just half of what the next task receives: Reproducible with this: {code} CREATE TEMPORARY TABLE test ( `number` SMALLINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); SELECT * FROM ( (SELECT * FROM test) UNION ALL (SELECT * FROM test) ) {code} Arguably, the use case is not too useful but other combinations may be affected, too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20217) More fine-grained timer processing
Nico Kruber created FLINK-20217: --- Summary: More fine-grained timer processing Key: FLINK-20217 URL: https://issues.apache.org/jira/browse/FLINK-20217 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.11.2, 1.10.2, 1.12.0 Reporter: Nico Kruber Timers are currently processed in one big block under the checkpoint lock (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic in a number of scenarios while doing checkpointing which would lead to checkpoints timing out (and even unaligned checkpoints would not help). If you have a huge number of timers to process when advancing the watermark and the task is also back-pressured, the situation may actually be worse since you would block on the checkpoint lock and also wait for buffers/credits from the receiver. I propose to make this loop more fine-grained so that it is interruptible by checkpoints, but maybe there is also some other way to improve here. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20104) Web UI checkpoint stats "refresh" button has to be clicked twice
Nico Kruber created FLINK-20104: --- Summary: Web UI checkpoint stats "refresh" button has to be clicked twice Key: FLINK-20104 URL: https://issues.apache.org/jira/browse/FLINK-20104 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.11.2, 1.12.0 Environment: Firefox on Linux Reporter: Nico Kruber In order to get the UI's checkpoint stats updated, I always have to click the refresh button twice - a single click doesn't change anything. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20099) HeapStateBackend checkpoint error hidden under cryptic message
Nico Kruber created FLINK-20099: --- Summary: HeapStateBackend checkpoint error hidden under cryptic message Key: FLINK-20099 URL: https://issues.apache.org/jira/browse/FLINK-20099 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.11.2 Reporter: Nico Kruber Attachments: Screenshot_20201112_001331.png When the memory state back-end hits a certain size, it fails to permit checkpoints. Even though a very detailed exception is thrown at its source, this is neither logged nor shown in the UI: * Logs just contain: {code:java} 00:06:41.462 [jobmanager-future-thread-14] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 2 by task 8eb303cd3196310cb2671212f4ed013c of job c9b7a410bd3143864ca23ba89595d878 at 6a73bcf2-46b6-4735-a616-fdf09ff1471c @ localhost (dataPort=-1). {code} * UI: (also see the attached Screenshot_20201112_001331.png) {code:java} Failure Message: The job has failed. {code} -> this isn't even true: the job is still running fine! Debugging into {{PendingCheckpoint#abort()}} reveals that the causing exception is actually still in there but the detailed information from it is just never used. For reference, this is what is available there and should be logged or shown: {code:java} java.lang.Exception: Could not materialize checkpoint 2 for operator aggregates -> (Sink: sink-agg-365, Sink: sink-agg-180, Sink: sink-agg-45, Sink: sink-agg-30) (4/4). at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:191) at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:138) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=6122737 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:50) at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102) ... 3 more Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=6122737 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64) at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145) at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126) at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:199) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:476) ... 5 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20061) Row constructor unsupported in aggregation function
Nico Kruber created FLINK-20061: --- Summary: Row constructor unsupported in aggregation function Key: FLINK-20061 URL: https://issues.apache.org/jira/browse/FLINK-20061 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.11.2 Reporter: Nico Kruber I was trying to use {{ROW}} in a user-defined aggregate function in a query like this: {code} SELECT `id`, HOP_END(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE) AS `window_end`, RowMaxv0(`amount`, ROW(`timestamp`, `amount`, `payload`)) AS `max_amount` FROM `input` GROUP BY HOP(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE), `id`; {code} Eventually this resulted in an "unsupported" exception from Calcite: {code} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.ververica.platform.sql.functions.RowMaxv0.main(RowMaxv0.java:93) Caused by: java.lang.UnsupportedOperationException at org.apache.calcite.sql.validate.SqlValidatorImpl.validateColumnListParams(SqlValidatorImpl.java:5689) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:268) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 5 more {code} A workaround for this is to go via a subquery like the following but ultimately, this should result in the same thing (a simple projection). {code} SELECT `id`, HOP_END(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE) AS `window_end`, RowMaxv0(`amount`, `row`) AS `max_amount` FROM (SELECT `id`, `timestamp`, `amount`, ROW(`timestamp`, `amount`, `payload`) AS `row` FROM `input`) GROUP BY HOP(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE), `id` {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20059) Outdated SQL docs on aggregate functions' merge
Nico Kruber created FLINK-20059: --- Summary: Outdated SQL docs on aggregate functions' merge Key: FLINK-20059 URL: https://issues.apache.org/jira/browse/FLINK-20059 Project: Flink Issue Type: Bug Components: Documentation, Table SQL / API Affects Versions: 1.11.2, 1.12.0 Reporter: Nico Kruber In the java docs as well as the user docs, the {{merge}} method of an aggregation UDF is described as optional, e.g. {quote}Merges a group of accumulator instances into one accumulator instance. This function must be implemented for data stream session window grouping aggregates and data set grouping aggregates.{quote} However, it seems that nowadays this method is required in more cases (I stumbled on this for a HOP window in streaming): {code} StreamExecGlobalGroupAggregate.scala .needMerge(mergedAccOffset, mergedAccOnHeap, mergedAccExternalTypes) StreamExecGroupWindowAggregateBase.scala generator.needMerge(mergedAccOffset = 0, mergedAccOnHeap = false) StreamExecIncrementalGroupAggregate.scala .needMerge(mergedAccOffset, mergedAccOnHeap = true, mergedAccExternalTypes) StreamExecLocalGroupAggregate.scala .needMerge(mergedAccOffset = 0, mergedAccOnHeap = true) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19999) State Processor API classes leaking into savepoint
Nico Kruber created FLINK-1: --- Summary: State Processor API classes leaking into savepoint Key: FLINK-1 URL: https://issues.apache.org/jira/browse/FLINK-1 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.11.2 Reporter: Nico Kruber Currently, any configuration for serializers that you are using when writing a State Processor API job will be shared with the serializers that are used for writing a savepoint. However, your normal job shouldn't necessarily depend on (helper) classes that you only use in the StateProc API job. By default, for example, {{ExecutionConfig#autoTypeRegistrationEnabled = true}} and thus classes like {{org.apache.flink.runtime.checkpoint.OperatorSubtaskState}} will be registered with Kryo and will thus also be needed when reading the created savepoint if you have Kryo serialization in your job. This particular instance can be worked around by calling {{ExecutionConfig#disableAutoTypeRegistration()}} but the problem is probably bigger and extends to other type registrations, e.g. POJOs, as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19972) Provide more details when type serializers are not compatible
Nico Kruber created FLINK-19972: --- Summary: Provide more details when type serializers are not compatible Key: FLINK-19972 URL: https://issues.apache.org/jira/browse/FLINK-19972 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.11.2 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.12.0 Currently, when the type serializer is incompatible, you get exceptions like these: {code:java} StateMigrationException("For heap backends, the new namespace serializer must be compatible."); StateMigrationException("The new namespace serializer must be compatible."); StateMigrationException("For heap backends, the new state serializer must not be incompatible."); StateMigrationException("The new state serializer cannot be incompatible.") StateMigrationException("The new key serializer must be compatible."){code} which are not really helpful to the user in debugging serializers. Since we already have the old serializer (snapshot) and the new one available, we should add this detail to the exceptions for improved usability. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19462) Checkpoint statistics for unfinished task snapshots
Nico Kruber created FLINK-19462: --- Summary: Checkpoint statistics for unfinished task snapshots Key: FLINK-19462 URL: https://issues.apache.org/jira/browse/FLINK-19462 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Metrics Reporter: Nico Kruber If a checkpoint times out, there are currently no stats on the not-yet-finished tasks in the Web UI, so you have to crawl into (debug?) logs. It would be nice to have these incomplete stats in there instead so that you know quickly what was going on. I could think of these ways to accomplish this: * the checkpoint coordinator could ask the TMs for it after failing the checkpoint or * the TMs could send the stats when they notice that the checkpoint is aborted Maybe there are more options, but I think, this improvement in general would benefit debugging checkpoints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19381) Fix docs about relocatable savepoints
Nico Kruber created FLINK-19381: --- Summary: Fix docs about relocatable savepoints Key: FLINK-19381 URL: https://issues.apache.org/jira/browse/FLINK-19381 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.11.2, 1.12.0 Reporter: Nico Kruber Although savepoints are relocatable since Flink 1.11, the docs still state otherwise, for example in [https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#triggering-savepoints] The warning there, as well as the other changes from FLINK-15863, should be removed again and potentially replaces with new constraints. One known constraint is that if taskowned state is used (\{{GenericWriteAhreadLog}} sink), savepoints are currently not relocatable yet. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19112) No access to metric group in ScalarFunction when optimizing
Nico Kruber created FLINK-19112: --- Summary: No access to metric group in ScalarFunction when optimizing Key: FLINK-19112 URL: https://issues.apache.org/jira/browse/FLINK-19112 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.1 Reporter: Nico Kruber Attachments: MetricsGroupBug.java Under some circumstances, I cannot access {{context.getMetricGroup()}} in a {{ScalarFunction}} like this (full job attached): {code:java} public static class MyUDF extends ScalarFunction { @Override public void open(FunctionContext context) throws Exception { super.open(context); context.getMetricGroup(); } public Integer eval(Integer id) { return id; } } {code} which leads to this exception: {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: getMetricGroup is not supported when optimizing at org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249) at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57) at ExpressionReducer$2.open(Unknown Source) at org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118) at org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696) at org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618) at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50) {code} I also tried to work around this with a try-catch, assuming that this method is called once during optimisation and another time
[jira] [Created] (FLINK-18962) Improve error message if checkpoint directory is not writable
Nico Kruber created FLINK-18962: --- Summary: Improve error message if checkpoint directory is not writable Key: FLINK-18962 URL: https://issues.apache.org/jira/browse/FLINK-18962 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.11.1 Reporter: Nico Kruber If the checkpoint directory from {{state.checkpoints.dir}} is not writable by the user that Flink is running with, checkpoints will be declined, but the real cause is not mentioned anywhere: * the Web UI says: "Cause: The job has failed" (the Flink job is running though) * the JM log says: {code} 2020-08-14 12:13:18,820 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering checkpoint 2 (type=CHECKPOINT) @ 159738819 for job 2c567b14e8d0833404931ef47dfec266. 2020-08-14 12:13:18,921 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline checkpoint 2 by task 0d4fd75374ad16c8d963679e3c2171ec of job 2c567b14e8d0833404931ef47dfec266 at a184deea621e3923fbfcb1d899348448 @ Nico-PC.lan (dataPort=35531). {code} * the TM log says: {code} 2020-08-14 12:13:14,102 INFO org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Checkpoint 1 has been notified as aborted, would not trigger any checkpoint. {code} And that's it. It should have a real error message indicating that the checkpoint (sub)-directory could not be created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18955) Add snapshot path to job startup message
Nico Kruber created FLINK-18955: --- Summary: Add snapshot path to job startup message Key: FLINK-18955 URL: https://issues.apache.org/jira/browse/FLINK-18955 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.11.1 Reporter: Nico Kruber When a job is started from a checkpoint or savepoint (I'm using snapshot as the unanimous term below), the {{CheckpointCoordinator}} prints a log line like this: {code} 2020-08-13 13:50:51,418 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 220d8a4953cd40198b6eb3b1ec0cece0 from latest valid checkpoint: Checkpoint 357 @ 1597326576925 for 220d8a4953cd40198b6eb3b1ec0cece0. {code} I propose to add the path to the snapshot to this message because which snapshot is taken for restore may actually not be that obvious for the user: even if a savepoint was specified in the job start command, e.g. in a Kubernetes pod spec, an HA store could overrule the decision and take a more recent snapshot instead. If that snapshot is a savepoint, it is not that easy to map this to checkpoint IDs and find out which savepoint the job actually started from. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18806) Taskmanager doesn't start up with error in config
Nico Kruber created FLINK-18806: --- Summary: Taskmanager doesn't start up with error in config Key: FLINK-18806 URL: https://issues.apache.org/jira/browse/FLINK-18806 Project: Flink Issue Type: Bug Components: Deployment / Scripts Affects Versions: 1.11.1 Reporter: Nico Kruber With the following (wrong) configuration setting in {{flink-conf.yaml}}, a taskmanager will not start up, basically print nothing on the command line, and have no log file to look at: {code} taskmanager.memory.managed.fraction: '0.4' {code} Console output: {code} > ./bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host Nico-PC.lan. [ERROR] The execution result is empty. [ERROR] Could not get JVM parameters and dynamic configurations properly. {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18769) Streaming Table job stuck when enabling minibatching
Nico Kruber created FLINK-18769: --- Summary: Streaming Table job stuck when enabling minibatching Key: FLINK-18769 URL: https://issues.apache.org/jira/browse/FLINK-18769 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.1 Reporter: Nico Kruber The following Table API streaming job is stuck when enabling mini batching {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); // disable mini-batching completely to get a result Configuration tableConf = tableEnv.getConfig() .getConfiguration(); tableConf.setString("table.exec.mini-batch.enabled", "true"); tableConf.setString("table.exec.mini-batch.allow-latency", "5 s"); tableConf.setString("table.exec.mini-batch.size", "5000"); tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); tableEnv.executeSql( "CREATE TABLE input_table (" + "location STRING, " + "population INT" + ") WITH (" + "'connector' = 'kafka', " + "'topic' = 'kafka_batching_input', " + "'properties.bootstrap.servers' = 'localhost:9092', " + "'format' = 'csv', " + "'scan.startup.mode' = 'earliest-offset'" + ")"); tableEnv.executeSql( "CREATE TABLE result_table WITH ('connector' = 'print') LIKE input_table (EXCLUDING OPTIONS)"); tableEnv .from("input_table") .groupBy($("location")) .select($("location").cast(DataTypes.CHAR(2)).as("location"), $("population").sum().as("population")) .executeInsert("result_table"); {code} I am using a pre-populated Kafka topic called {{kafka_batching_input}} with these elements: {code} "Berlin",1 "Berlin",2 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18767) Streaming job stuck when disabling operator chaining
Nico Kruber created FLINK-18767: --- Summary: Streaming job stuck when disabling operator chaining Key: FLINK-18767 URL: https://issues.apache.org/jira/browse/FLINK-18767 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.11.1, 1.10.1, 1.9.3, 1.8.3 Reporter: Nico Kruber The following code is stuck sending data from the source to the map operator. Two settings seem to have an influence here: {{env.setBufferTimeout(-1);}} and {{env.disableOperatorChaining();}} - if I remove either of these, the job works as expected. (I pre-populated my Kafka topic with one element to reproduce easily) {code} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // comment either these two and the job works env.setBufferTimeout(-1); env.disableOperatorChaining(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); DataStreamSource input = env.addSource( consumer); input .map((x) -> x) .print(); env.execute(); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18276) NullPointerException when closing KafkaConsumer
Nico Kruber created FLINK-18276: --- Summary: NullPointerException when closing KafkaConsumer Key: FLINK-18276 URL: https://issues.apache.org/jira/browse/FLINK-18276 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.10.1, 1.9.3, 1.8.3, 1.11.0 Reporter: Nico Kruber {code} WARN org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher - Error while closing Kafka consumer java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:282) {code} {{KafkaConsumerThread#reassignPartitions}} is temporarily setting {{consumer}} to {{null}} and if there is an exception (in this case, it was a timeout), the {{finally}} block in {{KafkaConsumerThread.run}} would fail with an NPE. Even more so, {{KafkaConsumerThread#reassignPartitions}} put the original consumer into {{consumerTmp}} which is not closed now and may leak underlying (Kafka) resources. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18255) Add API annotations to RocksDB user-facing classes
Nico Kruber created FLINK-18255: --- Summary: Add API annotations to RocksDB user-facing classes Key: FLINK-18255 URL: https://issues.apache.org/jira/browse/FLINK-18255 Project: Flink Issue Type: Improvement Components: Runtime / State Backends Affects Versions: 1.11.0 Reporter: Nico Kruber Several user-facing classes in {{flink-statebackend-rocksdb}} don't have any API annotations, not even {{@PublicEvolving}}. These should be added to clarify their usage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18242) Custom OptionsFactory in user code not working when configured via code
Nico Kruber created FLINK-18242: --- Summary: Custom OptionsFactory in user code not working when configured via code Key: FLINK-18242 URL: https://issues.apache.org/jira/browse/FLINK-18242 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.10.1, 1.10.0 Reporter: Nico Kruber Attachments: DefaultConfigurableOptionsFactoryWithLog.java When I configure a custom {{OptionsFactory}} for RocksDB like this: {code:java} Configuration globalConfig = GlobalConfiguration.loadConfiguration(); String checkpointDataUri = globalConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY); RocksDBStateBackend stateBackend = new RocksDBStateBackend(checkpointDataUri); stateBackend.setOptions(new DefaultConfigurableOptionsFactoryWithLog()); env.setStateBackend((StateBackend) stateBackend);{code} it seems to be loaded {code:java} 2020-06-10 12:54:20,720 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: DEFAULT. 2020-06-10 12:54:20,721 INFO org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using application-defined options factory: DefaultConfigurableOptionsFactoryWithLog{DefaultConfigurableOptionsFactory{configuredOptions={}}}. {code} but it seems like none of the options defined in there is actually used. Just as an example, my factory does set the info log level to {{INFO_LEVEL}} but this is what you will see in the created RocksDB instance: {code:java} > cat /tmp/flink-io-c95e8f48-0daa-4fb9-a9a7-0e4fb42e9135/*/db/OPTIONS*|grep > info_log_level info_log_level=HEADER_LEVEL info_log_level=HEADER_LEVEL{code} Together with the bug from FLINK-18241, is seems I cannot re-activate the RocksDB log that we disabled in FLINK-15068. FLINK-15747 was aiming at changing that particular configuration, but the problem seems broader since {{setDbLogDir()}} was actually also ignored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18241) Custom OptionsFactory in user code not working when configured via flink-conf.yaml
Nico Kruber created FLINK-18241: --- Summary: Custom OptionsFactory in user code not working when configured via flink-conf.yaml Key: FLINK-18241 URL: https://issues.apache.org/jira/browse/FLINK-18241 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.10.1, 1.10.0 Reporter: Nico Kruber Attachments: DefaultConfigurableOptionsFactoryWithLog.java It seems like Flink 1.10 broke custom {{OptionsFactory}} definitions via the {{state.backend.rocksdb.options-factory}} configuration if the implementation resides in the user-code jar file. This is particularly bad to debug RocksDB issues since we disabled its (ever-growing) LOG file in FLINK-15068. If you look at the stack trace from the error below, you will notice, that {{StreamExecutionEnvironment}} is not provided with a user-code classloader and will us the one of its own class which is the parent loader that does not know about our {{OptionsFactory}}. This exact same code was working with Flink 1.9.3. (I believe putting the custom {{OptionsFactory}} into a separate jar file inside Flink's lib folder may be a workaround but that should ideally not be needed). {code:java} 2020-06-09 16:18:59,409 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not start cluster entrypoint StandaloneJobClusterEntryPoint. org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneJobClusterEntryPoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:192) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:525) [flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:116) [flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:220) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:174) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:173) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] ... 2 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the JobGraph. at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:196) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:220) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:174) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:173) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] ... 2 more Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph from the provided user code jar. at org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:114) ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1] at org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55
[jira] [Created] (FLINK-17706) Clarify licensing situation
Nico Kruber created FLINK-17706: --- Summary: Clarify licensing situation Key: FLINK-17706 URL: https://issues.apache.org/jira/browse/FLINK-17706 Project: Flink Issue Type: Sub-task Components: Benchmarks Affects Versions: 1.11.0 Reporter: Nico Kruber Fix For: 1.11.0 After enabling the rat plugin, it finds the following files with missing or invalid license headers: {code:java} src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java src/main/java/org/apache/flink/benchmark/functions/IntLongApplications.java src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java src/main/java/org/apache/flink/benchmark/functions/LongSource.java src/main/java/org/apache/flink/benchmark/functions/MultiplyByTwo.java src/main/java/org/apache/flink/benchmark/functions/MultiplyIntLongByTwo.java src/main/java/org/apache/flink/benchmark/functions/SuccessException.java src/main/java/org/apache/flink/benchmark/functions/SumReduce.java src/main/java/org/apache/flink/benchmark/functions/SumReduceIntLong.java src/main/java/org/apache/flink/benchmark/functions/ValidatingCounter.java src/main/java/org/apache/flink/benchmark/functions/QueuingLongSource.java src/main/java/org/apache/flink/benchmark/CollectSink.java src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoCoStreamMap.java src/main/resources/avro/mypojo.avsc src/main/resources/protobuf/MyPojo.proto src/main/resources/thrift/mypojo.thrift save_jmh_result.py {code} The license should be clarified with the author and all contributors of that file. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17705) Add rat license checks
Nico Kruber created FLINK-17705: --- Summary: Add rat license checks Key: FLINK-17705 URL: https://issues.apache.org/jira/browse/FLINK-17705 Project: Flink Issue Type: Sub-task Components: Benchmarks Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.11.0 Before the code from [https://github.com/dataArtisans/flink-benchmarks/] is contributed, the licenses should be cleaned up and as a first step, we should set up the {{apache-rat-plugin}} similarly to how the Flink main repo uses it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17704) Allow running specific benchmarks from maven directly
Nico Kruber created FLINK-17704: --- Summary: Allow running specific benchmarks from maven directly Key: FLINK-17704 URL: https://issues.apache.org/jira/browse/FLINK-17704 Project: Flink Issue Type: Improvement Components: Benchmarks Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.11.0 Sometimes it would be nice to run a specific benchmark from maven directly. Currently this can be done via: {code:java} mvn -Dflink.version=1.11-SNAPSHOT clean package exec:exec -Dexec.executable=java -Dexec.args="-jar target/flink-hackathon-benchmarks-0.1.jar -rf csv org.apache.flink.benchmark.StreamNetworkThroughputBenchmarkExecutor"{code} but this is quite cumbersome and erroneous. Instead, I propose to simply define a property which by default runs all benchmarks but can be overridden on the command line to run a specific pattern (that is interpreted by JMH) like this: {code:java} mvn -Dflink.version=1.11-SNAPSHOT exec:exec -Dbenchmarks="org.apache.flink.benchmark.StreamNetworkThroughputBenchmarkExecutor" {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17703) Default execution command fails due 'benchmark' profile being inactive
Nico Kruber created FLINK-17703: --- Summary: Default execution command fails due 'benchmark' profile being inactive Key: FLINK-17703 URL: https://issues.apache.org/jira/browse/FLINK-17703 Project: Flink Issue Type: Improvement Components: Benchmarks Affects Versions: 1.11.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.11.0 FLINK-17057 had some unfortunate side effects: by having the "{{include-netty-tcnative-dynamic"}} profile active by default, the "{{benchmark"}} profile was not active any more. Thus the following command that was typically used for running the benchmarks failed unless the "{{benchmark"}} profile was activated manually like this: {code:java} mvn -Dflink.version=1.11-SNAPSHOT clean package exec:exec -P benchmark{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17293) Port training exercises data sets to a generator
Nico Kruber created FLINK-17293: --- Summary: Port training exercises data sets to a generator Key: FLINK-17293 URL: https://issues.apache.org/jira/browse/FLINK-17293 Project: Flink Issue Type: Improvement Components: Documentation / Training / Exercises Affects Versions: 1.10.1 Reporter: Nico Kruber Assignee: Nico Kruber Currently, the training exercises still rely on training data hosted at Ververica: - http://training.ververica.com/trainingData/nycTaxiRides.gz and - http://training.ververica.com/trainingData/nycTaxiFares.gz Since this has always been a problem for users (and one additional step), I propose to rewrite the training sources to use a data generator instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17279) Use gradle build scans
Nico Kruber created FLINK-17279: --- Summary: Use gradle build scans Key: FLINK-17279 URL: https://issues.apache.org/jira/browse/FLINK-17279 Project: Flink Issue Type: Improvement Components: Training Excercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 Gradle build scans [1] add quick analysis into what happened if a CI build failed. It would upload a report with detailed info to [1]. See this for an example: https://gradle.com/s/g3tdhu47lntoc [1] https://scans.gradle.com/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17278) Add Travis to the training exercises
Nico Kruber created FLINK-17278: --- Summary: Add Travis to the training exercises Key: FLINK-17278 URL: https://issues.apache.org/jira/browse/FLINK-17278 Project: Flink Issue Type: Improvement Components: Training Excercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 This will run all the tests and verify code quality. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17277) Apply IntelliJ recommendations to training exercises
Nico Kruber created FLINK-17277: --- Summary: Apply IntelliJ recommendations to training exercises Key: FLINK-17277 URL: https://issues.apache.org/jira/browse/FLINK-17277 Project: Flink Issue Type: Improvement Components: Training Excercises Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 IntelliJ has a few recommendations on the original code of the training exercises. These should be addressed to serve as good reference code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17276) Add checkstyle to training exercises
Nico Kruber created FLINK-17276: --- Summary: Add checkstyle to training exercises Key: FLINK-17276 URL: https://issues.apache.org/jira/browse/FLINK-17276 Project: Flink Issue Type: Improvement Components: Training Excercises Affects Versions: 1.10.0, 1.11.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 Port Flink's checkstyle to the training exercises and adapt the code accordingly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17275) Add core training exercises
Nico Kruber created FLINK-17275: --- Summary: Add core training exercises Key: FLINK-17275 URL: https://issues.apache.org/jira/browse/FLINK-17275 Project: Flink Issue Type: New Feature Components: Training Excercises Affects Versions: 1.11.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 Port the core training exercises, their descriptions, solutions, and tests from https://github.com/ververica/flink-training-exercises to Apache Flink. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17171) Blink planner fails to compile Table program with POJO source
Nico Kruber created FLINK-17171: --- Summary: Blink planner fails to compile Table program with POJO source Key: FLINK-17171 URL: https://issues.apache.org/jira/browse/FLINK-17171 Project: Flink Issue Type: Bug Components: Table SQL / Planner, Table SQL / Runtime Affects Versions: 1.10.0 Reporter: Nico Kruber Attachments: error.log It seems as if FLINK-13993 made the Table API (Blink planner) unusable for POJO sources where the POJO class is in user code. For https://github.com/ververica/lab-sql-vs-datastream/blob/master/src/main/java/com/ververica/LateralTableJoin.java I get the following Exception when I run it on a Flink 1.10.0 cluster (full version attached): {code} 2020-04-15 17:19:15,561 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled exception. org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. ... Caused by: org.codehaus.commons.compiler.CompileException: Line 28, Column 175: Cannot determine simple type name "com" ... at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) {code} I enabled debug logs and this is what it is trying to compile: {code} @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$15.toInternal((com.ververica.tables.FactTable.Fact) element.getValue()); ... {code} I use a standalone cluster and submit via web UI and also verified that my jar file does not contain anything else but its compiled classes. This code is working fine inside the IDE and was also working with Flink 1.10 and VVP 2.0 which did not use a dedicated class loader for user code. My guess is that the (generated) code does not have access to {{FactTable.Fact}} and the Janino compiler does not produce the right error message seeing "com" as a primitive type instead. FLINK-7490 and FLINK-9220 seem related but too old (legacy planner). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17143) Blog feed.xml should only contain excerpts, not full contents
Nico Kruber created FLINK-17143: --- Summary: Blog feed.xml should only contain excerpts, not full contents Key: FLINK-17143 URL: https://issues.apache.org/jira/browse/FLINK-17143 Project: Flink Issue Type: Improvement Components: Project Website Affects Versions: 1.10.0 Reporter: Nico Kruber The blog's atom 2.0 feed at https://flink.apache.org/blog/feed.xml contains the whole content of all blog posts while it should probably only contain the excerpts (and links to the full versions) as usual. This may save some unnecessary web site traffic (bytes) from users using the feed to get updates. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17057) Add OpenSSL micro-benchmarks
Nico Kruber created FLINK-17057: --- Summary: Add OpenSSL micro-benchmarks Key: FLINK-17057 URL: https://issues.apache.org/jira/browse/FLINK-17057 Project: Flink Issue Type: New Feature Components: Benchmarks Affects Versions: 1.11.0 Reporter: Nico Kruber Assignee: Nico Kruber Our JMH micro-benchmarks currently only run with Java's SSL implementation but it would also be nice to have them evaluated with OpenSSL. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17056) JMH main() methods call unrelated benchmarks
Nico Kruber created FLINK-17056: --- Summary: JMH main() methods call unrelated benchmarks Key: FLINK-17056 URL: https://issues.apache.org/jira/browse/FLINK-17056 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.10.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.11.0 Each benchmark class is accompanied by an according {{public static main (String[] args)}} method which should run all benchmarks in that class. However, it just uses the class' simple name in a regexp like ".*.*" and may thus also match further classes that were not intended to run. An example for this is the {{StreamNetworkThroughputBenchmarkExecutor}} which also runs benchmarks from {{DataSkewStreamNetworkThroughputBenchmarkExecutor}}. Using the canonical name instead fixes that behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16890) Add AvroGeneric benchmark
Nico Kruber created FLINK-16890: --- Summary: Add AvroGeneric benchmark Key: FLINK-16890 URL: https://issues.apache.org/jira/browse/FLINK-16890 Project: Flink Issue Type: New Feature Components: Benchmarks Reporter: Nico Kruber Assignee: Nico Kruber Currently, serialization benchmarks for Avro cover specific records and Avro reflect. What is missing is GenericRecord which I propose to add to {{SerializationFrameworkAllBenchmarks}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16729) Offer an out-of-the-box Set serializer
Nico Kruber created FLINK-16729: --- Summary: Offer an out-of-the-box Set serializer Key: FLINK-16729 URL: https://issues.apache.org/jira/browse/FLINK-16729 Project: Flink Issue Type: New Feature Components: API / Type Serialization System Affects Versions: 1.10.0 Reporter: Nico Kruber Currently, Set types are serialized by Kryo by default, since Flink does not come with an own SetSerializer (only one for maps). While the MapSerializer can be easily adapted to cover sets instead, I think, this should be available by default to get the maximum performance out of Flink (kryo is slow!) When this is added, however, we need to provide a migration path for old state (or not use the new SetSerializer by default but offer to opt-in). This may need further investigation as to whether it is possible to migrate from kryo automatically and whether we can check potential changes to the encapsulated entry class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16664) Unable to set DataStreamSource parallelism to default (-1)
Nico Kruber created FLINK-16664: --- Summary: Unable to set DataStreamSource parallelism to default (-1) Key: FLINK-16664 URL: https://issues.apache.org/jira/browse/FLINK-16664 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: Nico Kruber Assignee: Nico Kruber Fix For: 1.10.1, 1.11.0 A hotfix part of FLINK-14405 actually breaks setting the parallelism to its default value for datastream sources, i.e. using value {{-1}}. This is because of a small typo: instead of {code:java} OperatorValidationUtils.validateParallelism(parallelism, isParallel); {code} this is called in org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism: {code:java} OperatorValidationUtils.validateMaxParallelism(parallelism, isParallel); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16576) State inconsistency on restore with memory state backends
Nico Kruber created FLINK-16576: --- Summary: State inconsistency on restore with memory state backends Key: FLINK-16576 URL: https://issues.apache.org/jira/browse/FLINK-16576 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.10.0, 1.9.2 Reporter: Nico Kruber Fix For: 1.9.3, 1.10.1, 1.11.0 I occasionally see a few state inconsistencies with the {{TopSpeedWindowing}} example in Flink. Restore would fail with either of these causes, but only for the memory state backends and only with some combinations of parallelism I took the savepoint with and parallelism I restore the job with: {code:java} java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, endKeyGroup=95} does not contain key group 97 {code} or {code:java} java.lang.NullPointerException at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280) {code} or {code:java} java.io.IOException: Corrupt stream, found tag: 8 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217) {code} I managed to make it reproducible in a test that I quickly hacked together in [https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java] (please checkout the whole repository since I had to change some dependencies). In a bit more detail, this is what I discovered before, also with a manual savepoint on S3: Savepoint that was taken with parallelism 2 (p=2) and shows the restore failure in three different ways (all running in Flink 1.10.0; but I also see it in Flink 1.9): * first of all, if I try to restore with p=2, everything is fine * if I restore with p=4 I get an exception like the one mentioned above: {code:java} 2020-03-11 15:53:35,149 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) (2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED. java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalArgumentException: KeyGroupRange{startKeyGr