[jira] [Created] (FLINK-30072) Cannot assign instance of SerializedLambda to field KeyGroupStreamPartitioner.keySelector

2022-11-17 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-30072:
---

 Summary: Cannot assign instance of SerializedLambda to field 
KeyGroupStreamPartitioner.keySelector
 Key: FLINK-30072
 URL: https://issues.apache.org/jira/browse/FLINK-30072
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.16.0
Reporter: Nico Kruber


In application mode, if the {{usrlib}} directories of the JM and TM differ, 
e.g. same jars but different names, the job is failing and throws this cryptic 
exception on the JM:

{code}
2022-11-17 09:55:12,968 INFO  
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting 
job.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not 
instantiate outputs in order.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:537)
 ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1600)
 ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1584)
 ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:408) 
~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:362) 
~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:335) 
~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:327) 
~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.(StreamTask.java:317) 
~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.(SourceOperatorStreamTask.java:84)
 ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at jdk.internal.reflect.GeneratedConstructorAccessor38.newInstance(Unknown 
Source) ~[?:?]
at 
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown 
Source) ~[?:?]
at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
at 
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1589)
 ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:714) 
~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown 
Source) ~[?:?]
at 
java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown 
Source) ~[?:?]
at java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.util.ArrayList.readObject(Unknown Source) ~[?:?]
at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) 
~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
 ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0

[jira] [Created] (FLINK-30045) FromClasspathEntryClassInformationProvider too eager to verify MainClass

2022-11-16 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-30045:
---

 Summary: FromClasspathEntryClassInformationProvider too eager to 
verify MainClass
 Key: FLINK-30045
 URL: https://issues.apache.org/jira/browse/FLINK-30045
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.16.0, 1.17.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.17.0, 1.16.1


{{FromClasspathEntryClassInformationProvider}} is attempting to verify 
(eagerly) whether the given MainClass is on the user classpath. However, it 
doesn't handle cases where the main class is inside a nested jar.
This is something you would see when using such a nested jar file with the 
{{StandaloneApplicationClusterEntryPoint}}, e.g. from {{standalone-job.sh}}

We actually don't need this check at all since {{PackagedProgram}} is already 
doing it while attempting to load the main class. Having this once should be 
enough.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29884) Flaky test failure in finegrained_resource_management/SortMergeResultPartitionTest.testRelease

2022-11-04 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-29884:
---

 Summary: Flaky test failure in 
finegrained_resource_management/SortMergeResultPartitionTest.testRelease
 Key: FLINK-29884
 URL: https://issues.apache.org/jira/browse/FLINK-29884
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination, Runtime / Network, Tests
Affects Versions: 1.17.0
Reporter: Nico Kruber
 Fix For: 1.17.0


{{SortMergeResultPartitionTest.testRelease}} failed with a timeout in the 
finegrained_resource_management tests:
{code:java}
Nov 03 17:28:07 [ERROR] Tests run: 20, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 64.649 s <<< FAILURE! - in 
org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest
Nov 03 17:28:07 [ERROR] SortMergeResultPartitionTest.testRelease  Time elapsed: 
60.009 s  <<< ERROR!
Nov 03 17:28:07 org.junit.runners.model.TestTimedOutException: test timed out 
after 60 seconds
Nov 03 17:28:07 at 
org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionTest.testRelease(SortMergeResultPartitionTest.java:374)
Nov 03 17:28:07 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Nov 03 17:28:07 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Nov 03 17:28:07 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Nov 03 17:28:07 at java.lang.reflect.Method.invoke(Method.java:498)
Nov 03 17:28:07 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Nov 03 17:28:07 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Nov 03 17:28:07 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Nov 03 17:28:07 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Nov 03 17:28:07 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Nov 03 17:28:07 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
Nov 03 17:28:07 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
Nov 03 17:28:07 at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
Nov 03 17:28:07 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
Nov 03 17:28:07 at java.lang.Thread.run(Thread.java:748) {code}
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42806=logs=a57e0635-3fad-5b08-57c7-a4142d7d6fa9=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29868) Dependency convergence error for org.osgi:org.osgi.core:jar

2022-11-03 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-29868:
---

 Summary: Dependency convergence error for 
org.osgi:org.osgi.core:jar
 Key: FLINK-29868
 URL: https://issues.apache.org/jira/browse/FLINK-29868
 Project: Flink
  Issue Type: Bug
  Components: Build System, Table SQL / Runtime
Affects Versions: 1.17.0
Reporter: Nico Kruber
 Fix For: 1.17.0


While working on FLINK-29867, the following new error is popping up while 
running

{code}
./mvnw clean install -pl flink-dist -am -DskipTests 
-Dflink.convergence.phase=install -Pcheck-convergence
{code}

(this is also done by CI which therefore fails)

{code}
[WARNING] 
Dependency convergence error for org.osgi:org.osgi.core:jar:4.3.0:runtime paths 
to dependency are:
+-org.apache.flink:flink-table-planner-loader-bundle:jar:1.17-SNAPSHOT
  +-org.apache.flink:flink-table-planner_2.12:jar:1.17-SNAPSHOT:runtime
+-org.apache.flink:flink-table-api-java-bridge:jar:1.17-SNAPSHOT:runtime
  +-org.apache.flink:flink-streaming-java:jar:1.17-SNAPSHOT:runtime
+-org.apache.flink:flink-runtime:jar:1.17-SNAPSHOT:runtime
  +-org.xerial.snappy:snappy-java:jar:1.1.8.3:runtime
+-org.osgi:org.osgi.core:jar:4.3.0:runtime
and
+-org.apache.flink:flink-table-planner-loader-bundle:jar:1.17-SNAPSHOT
  +-org.apache.flink:flink-table-planner_2.12:jar:1.17-SNAPSHOT:runtime
+-org.apache.flink:flink-scala_2.12:jar:1.17-SNAPSHOT:runtime
  +-org.apache.flink:flink-core:jar:1.17-SNAPSHOT:runtime
+-org.apache.commons:commons-compress:jar:1.21:runtime
  +-org.osgi:org.osgi.core:jar:6.0.0:runtime
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29867) Update maven-enforcer-plugin to 3.1.0

2022-11-03 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-29867:
---

 Summary: Update maven-enforcer-plugin to 3.1.0
 Key: FLINK-29867
 URL: https://issues.apache.org/jira/browse/FLINK-29867
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.17.0


We currently rely on 3.0.0-M1 but will have to skip 3.0.0 (final) due to 
MENFORCER-394 which hits Flink's current code base as well



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29865) Allow configuring the JDK in build-nightly-dist.yml

2022-11-03 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-29865:
---

 Summary: Allow configuring the JDK in build-nightly-dist.yml
 Key: FLINK-29865
 URL: https://issues.apache.org/jira/browse/FLINK-29865
 Project: Flink
  Issue Type: Improvement
  Components: Build System / Azure Pipelines
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.17.0


{{build-nightly-dist.yml}} currently uses the default JDK from 
https://github.com/flink-ci/flink-ci-docker which happens to be Java 1.8 that 
we use for releases. We should
# not rely on this default being set to 1.8 and
# be able to configure this in the workflows themselves



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29643) Possible NPE in ApplicationDispatcherBootstrap with failedJob submission and no HA

2022-10-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-29643:
---

 Summary: Possible NPE in ApplicationDispatcherBootstrap with 
failedJob submission and no HA
 Key: FLINK-29643
 URL: https://issues.apache.org/jira/browse/FLINK-29643
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.15.2, 1.16.0, 1.17.0
Reporter: Nico Kruber


If
- {{PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID}} is not set, and
- high availabibility is not activated, and
- {{DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR}} is set,
then a failure in job submission may fail with an NPE since the appropriate 
code in {{ApplicationDispatcherBootstrap#runApplicationEntryPoint()}} is trying 
to read the {{failedJobId}} from the configuration where it will not be present 
in these cases.

Please refer to the conditions that set the {{jobId}} in 
{{ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync()}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29208) Logs and stdout endpoints not mentioned on OpenAPI spec

2022-09-06 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-29208:
---

 Summary: Logs and stdout endpoints not mentioned on OpenAPI spec
 Key: FLINK-29208
 URL: https://issues.apache.org/jira/browse/FLINK-29208
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.15.2
Reporter: Nico Kruber


Using Flink's web UI and clicking on "Stdout" or "Logs" in a JM or TM accesses 
endpoints {{/jobmanager/logs}} and {{/jobmanager/stdout}} (and similar for TMs) 
but these don't seem to exist in the [REST API 
docs|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/] 
or the [REST API OpenAPI 
spec|https://nightlies.apache.org/flink/flink-docs-master/generated/rest_v1_dispatcher.yml].

Either these should become some webui-internal APIs (for which no concept 
exists at the moment), or these endpoints should be added to the docs and spec.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27836) RocksDBMapState iteration may stop too early for var-length prefixes

2022-05-30 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-27836:
---

 Summary: RocksDBMapState iteration may stop too early for 
var-length prefixes
 Key: FLINK-27836
 URL: https://issues.apache.org/jira/browse/FLINK-27836
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.14.4, 1.13.6, 1.15.0
Reporter: Nico Kruber


A similar, yet orthogonal, issue to 
https://issues.apache.org/jira/browse/FLINK-11141 is that the iterators used in 
RocksDBMapState iterate over everything with a matching prefix of flink-key and 
namespace. With var-length serializers for either of them, however, it may 
return data for unrelated keys and/or namespaces.
It looks like the built-in serializers of Flink are not affected though since 
they use a var-length encoding that is prefixed with the object's length and 
thus different lengths will not have the same prefix. More exotic serializers, 
e.g. relying on a terminating NUL character, may expose the above-mentioned 
behaviour, though.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-220: Temporal State

2022-05-06 Thread Nico Kruber
While working a bit more on this, David A and I noticed a couple of things 
that were not matching each other in the proposed APIs:

a) the proposed BinarySortedMultiMapState class didn't actually have getters 
that return multiple items per key, and
b) while having a single multi-map like implementation in the backend, with 
the adapted API, we'd like to put it up for discussion again whether we maybe 
want to have a user-facing BinarySortedMapState API as well which can be 
simpler but doesn't add any additional constraints to the state backends.

Let me go into details a bit more:
in a multi-map, a single key can be backed by a set of items and as such, the 
atomic unit that should be retrievable is not a single item but rather 
something like a Collection, an Iterable , a List, or so. Since we are already 
using Iterable in the main API, how about the following?
```
public class BinarySortedMultiMapState extends State {
  void put(UK key, Iterable values) throws Exception;
  void add(UK key, UV value) throws Exception;

  Iterable valueAt(UK key) throws Exception;

  Map.Entry> firstEntry() throws Exception;
  Map.Entry> lastEntry() throws Exception;

  Iterable>> readRange(UK fromKey, UK toKey) throws 
Exception;
  Iterable>> readRangeUntil(UK endUserKey) throws 
Exception;
  Iterable>> readRangeFrom(UK startUserKey) throws 
Exception;

  void clearRange(UK fromKey, UK toKey) throws Exception;
  void clearRangeUntil(UK endUserKey) throws Exception;
  void clearRangeFrom(UK startUserKey) throws Exception;
}
```

We also considered using Iterable> instead of Map.Entry>, but that wouldn't match well with firstEntry() and lastEntry() 
because for a single key, there is not a single first/last value. We also 
looked at common MultiMap insterfaces and their getters were also always 
retrieving the whole list/collection for a key. Since we don't want to promise 
too many details to the user, we believe, Iterable is our best choice for now 
but that can also be "upgraded" to, e.g., List in the future without breaking 
client code.

An appropriate map-like version of that would be the following:
```
public class BinarySortedMapState extends State {
  void put(UK key, UV value) throws Exception;

  UV valueAt(UK key) throws Exception;

  Map.Entry firstEntry() throws Exception;
  Map.Entry lastEntry() throws Exception;

  Iterable> readRange(UK fromKey, UK toKey) throws 
Exception;
  Iterable> readRangeUntil(UK endUserKey) throws Exception;
  Iterable> readRangeFrom(UK startUserKey) throws Exception;

  void clearRange(UK fromKey, UK toKey) throws Exception;
  void clearRangeUntil(UK endUserKey) throws Exception;
  void clearRangeFrom(UK startUserKey) throws Exception;
}
```


We believe, we were also missing details regarding the state descriptor and 
I'm still a bit fuzzy on what to provide as type T in StateDescriptor.
For the constructors, however, since we'd require a 
LexicographicalTypeSerializer implementation, we would propose the following 
three overloads similar to the MapStateDescriptor:
```
public class BinarySortedMultiMapStateDescriptor extends 
StateDescriptor, Map>/*?*/> {

public BinarySortedMapStateDescriptor(
String name, LexicographicalTypeSerializer keySerializer, 
TypeSerializer valueSerializer) {}

public BinarySortedMapStateDescriptor(
String name, LexicographicalTypeSerializer keySerializer, 
TypeInformation valueTypeInfo) {}

public BinarySortedMapStateDescriptor(
String name, LexicographicalTypeSerializer keySerializer, 
Class valueClass) {}
}
```
Technically, we could have a LexicographicalTypeInformation as well (for the 
2nd overload) but don't currently see the need for that wrapper since these 
serializers are just needed for State - but maybe someone with more insights 
into this topic can advise.


A few further points to to with respect to the implementation:
- we'll have to find a suitable heap-based state backend implementation that 
integrates well with all current efforts (needs to be discussed)
- the StateProcessor API will have to receive appropriate extensions to read 
and write this new form of state



Nico


On Friday, 29 April 2022 14:25:59 CEST Nico Kruber wrote:
> Hi all,
> Yun, David M, David A, and I had an offline discussion and talked through a
> couple of details that emerged from the discussion here. We believe, we have
> found a consensus on these points and would like to share our points for
> further feedback:
> 
> Let me try to get through the points that were opened in arbitrary order:
> 
> 
> 1. We want to offer a generic interface for sorted state, not just temporal
> state as proposed initially. We would like to...
> a) ...offer a single new state type similar to what TemporalListState was
> offering (so not offering something like TemporalValueState to keep the API
> slim).
> b) ...name it Binar

Re: [DISCUSS] FLIP-220: Temporal State

2022-04-29 Thread Nico Kruber
9) Why don't we want to provide a BinarySortedMap with value-like semantics 
similar to TemporalValueState?
-> We'd like to keep the code overhead in Flink small and not provide two more 
state primitives but instead only a single one. For use cases where you don't 
want to handle lists, you can use the BinarySortedMultiMap with its put() 
method and a list with a single entry that would overwrite the old one. While 
retrieving the value(s), you can then assume the list is either empty or has a 
single entry similar to what you are currently doing in a 
WindowProcessFunction. You can also always add a thin wrapper to provide that 
under a more convenient API if you need to.

A10) effects on the CEPOperator?
-> We don't have an overview yet. The buffering of events inside its 
`MapState> elementQueueState`, however, is a pattern that would 
benefit from our MultiMap since a single add() operation wouldn't require you 
to read the whole list again.


Sorry for the long email - we'd be happy to get more feedback and will 
incorporate this into the FLIP description soon.



Nico


-- 
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner





Re: [DISCUSS] FLIP-220: Temporal State

2022-04-22 Thread Nico Kruber
ls down to the discussion of how to handle duplicates.
> From the commonly accepted contracts, list implies that there could be
> duplicates and map implies otherwise. One concern about `Map` is that it
> also implies that you should be able to do a point query.
> 
> Best,
> D.
> 
> .
> 
> On Fri, Apr 22, 2022 at 9:21 AM Yun Tang  wrote:
> > Hi Nico,
> > 
> > I did not mean that we need to support all APIs in NavigableMap, and it is
> > indeed too heavily to implement them all.
> > Moreover, I also prefer iterator-like API instead of the original #tailMap
> > like API. I just use NavigableMap's API to show examples.
> > 
> > I think we can introduce new APIs like:
> > 
> > SortedMapState extends State
> > 
> >   Map.Entry firstEntry() throws Exception;
> >   Map.Entry lastEntry() throws Exception;
> >   Iterator> headIterator(UK endUserKey) throws
> >   Exception;
> >   Iterator> tailIterator(UK startUserKey) throws
> > 
> > Exception;
> > 
> >   Iterator> subIterator(UK startUserKey, UK endUserKey)
> > 
> > throws Exception;
> > 
> > Since SortedMapState has several new APIs, I prefer to introduce new state
> > descriptor to distinguish with the original map state.
> > 
> > For the API of SortedMapOfListsState, I would not be strong against, and I
> > just want to know the actual benefits if we really want to introduce API.
> > 
> > When talking about the part of changelog state backend, my concern is
> > about how to group keys together in the changelog logger.
> > I can share a problem, and before this I need to spend some time on how to
> > implement serializer to keep the order of serialized bytes same as
> > original
> > java objects first.
> > For the fixed-length serializer, such as LongSerializer, we just need to
> > ensure all numbers are positive or inverting the sign bit.
> > However, for non-fixed-length serializer, such as StringSerializer, it
> > will write the length of the bytes first, which will break the natural
> > order if comparing the bytes. Thus, we might need to avoid to write the
> > length in the serialized bytes.
> > On the other hand, changelog logger would record operation per key one by
> > one in the logs. We need to consider how to distinguish each key in the
> > combined serialized byte arrays.
> > 
> > Best
> > Yun Tang
> > 
> > --
> > *From:* Nico Kruber 
> > *Sent:* Thursday, April 21, 2022 23:50
> > *To:* dev 
> > *Cc:* David Morávek ; Yun Tang 
> > *Subject:* Re: [DISCUSS] FLIP-220: Temporal State
> > 
> > Thanks Yun Tang for your clarifications.
> > Let me keep my original structure and reply in these points...
> > 
> > 3. Should we generalise the Temporal***State to offer arbitrary key types
> > and
> > not just Long timestamps?
> > 
> > The use cases you detailed do indeed look similar to the ones we were
> > optimising in our TemporalState PoC...
> > 
> > I don't think, I'd like to offer a full implementation of NavigableMap
> > though
> > because that seems quite some overhead to implement while we can cover the
> > mentioned examples with the proposed APIs already when using iterators as
> > well
> > as single-value retrievals.
> > So far, when we were iterating from the smallest key, we could just use
> > Long.MIN_VALUE and start from there. That would be difficult to generalise
> > for
> > arbitrary data types because you may not always know the smallest possible
> > value for a certain serialized type (unless we put this into the
> > appropriate
> > serializer interface).
> > 
> > I see two options here:
> > a) a slim API but using NULL as an indicator for smallest/largest
> > depending on
> > the context, e.g.
> > 
> >   - `readRange(null, key)` means from beginning to key
> >   - `readRange(key, null)` means from key to end
> >   - `readRange(null, null)` means from beginning to end
> >   - `value[AtOr]Before(null)` means largest available key
> >   - `value[AtOr]After(null)` means smallest available key
> > 
> > b) a larger API with special methods for each of these use cases similar
> > to
> > what NavigableMap has but based on iterators and single-value functions
> > only
> > 
> > > BTW, I prefer to introduce another state descriptor instead of current
> > 
> > map
> > 
> > > state descriptor.
> > 
> > Can you elaborate on this? We currently don't need extra functionality, so
> > this would 

[jira] [Created] (FLINK-27353) Update training exercises to use Flink 1.15

2022-04-22 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-27353:
---

 Summary: Update training exercises to use Flink 1.15
 Key: FLINK-27353
 URL: https://issues.apache.org/jira/browse/FLINK-27353
 Project: Flink
  Issue Type: New Feature
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-220: Temporal State

2022-04-21 Thread Nico Kruber
Thanks Yun Tang for your clarifications.
Let me keep my original structure and reply in these points...

3. Should we generalise the Temporal***State to offer arbitrary key types and
not just Long timestamps?

The use cases you detailed do indeed look similar to the ones we were 
optimising in our TemporalState PoC...

I don't think, I'd like to offer a full implementation of NavigableMap though 
because that seems quite some overhead to implement while we can cover the 
mentioned examples with the proposed APIs already when using iterators as well 
as single-value retrievals.
So far, when we were iterating from the smallest key, we could just use 
Long.MIN_VALUE and start from there. That would be difficult to generalise for 
arbitrary data types because you may not always know the smallest possible 
value for a certain serialized type (unless we put this into the appropriate 
serializer interface).

I see two options here:
a) a slim API but using NULL as an indicator for smallest/largest depending on 
the context, e.g.
  - `readRange(null, key)` means from beginning to key
  - `readRange(key, null)` means from key to end
  - `readRange(null, null)` means from beginning to end
  - `value[AtOr]Before(null)` means largest available key
  - `value[AtOr]After(null)` means smallest available key
b) a larger API with special methods for each of these use cases similar to 
what NavigableMap has but based on iterators and single-value functions only

> BTW, I prefer to introduce another state descriptor instead of current map
> state descriptor.

Can you elaborate on this? We currently don't need extra functionality, so 
this would be a plain copy of the MapStateDescriptor...

> For the API of SortedMapOfListsState, I think this is a bit bounded to
> current implementation of RocksDB state-backend.

Actually, I don't think this is special to RocksDB but generic to all state 
backends that do not hold values in memory and allow fast append-like 
operations.
Additionally, since this is a very common use case and RocksDB is also widely 
used, I wouldn't want to continue without this specialization. For a similar 
reason, we offer ListState and not just ValueState...


4. ChangelogStateBackend

> For the discussion of ChangelogStateBackend, you can think of changelog
> state-backend as a write-ahead-log service. And we need to record the
> changes to any state, thus this should be included in the design doc as we
> need to introduce another kind of state, especially you might need to
> consider how to store key bytes serialized by the new serializer (as we
> might not be able to write the length in the beginning of serialized bytes
> to make the order of bytes same as natural order).

Since the ChangelogStateBackend "holds the working state in the underlying 
delegatedStateBackend, and forwards state changes to State Changelog", I 
honestly still don't see how this needs special handling. As long as the 
delegated state backend suppors sorted state, ChangelogStateBackend doesn't 
have to do anything special except for recording changes to state. Our PoC 
simply uses the namespace for these keys and that's the same thing the Window 
API is already using - so there's nothing special here. The order in the log 
doesn't have to follow the natural order because this is only required inside 
the delegatedStateBackend, isn't it?


Nico

On Wednesday, 20 April 2022 17:03:11 CEST Yun Tang wrote:
> Hi Nico,
> 
> Thanks for your clarification.
> For the discussion about generalizing Temporal state to sorted map state. I
> could give some examples of how to use sorted map state in min/max with
> retract functions.
 
> As you know, NavigableMap in java has several APIs like:
> 
> Map.Entry firstEntry();
> Map.Entry lastEntry();
> NavigableMap tailMap(K fromKey, boolean inclusive)
> 
> The #firstEntry API could be used in MinWithRetractAggFunction#updateMin,
> #lastEntry could be used in MaxWithRetractAggFunction#updateMax, and
> #tailMap could be used in FirstValueWithRetractAggFunction#retract.
 If we
> can introduce SortedMap-like state, these functions could be benefited.
> BTW, I prefer to introduce another state descriptor instead of current map
> state descriptor. 
> For the API of SortedMapOfListsState, I think this is a bit bounded to
> current implementation of RocksDB state-backend.
 
> For the discussion of ChangelogStateBackend, you can think of changelog
> state-backend as a write-ahead-log service. And we need to record the
> changes to any state, thus this should be included in the design doc as we
> need to introduce another kind of state, especially you might need to
> consider how to store key bytes serialized by the new serializer (as we
> might not be able to write the length in the beginning of serialized bytes
> to make the order of bytes same as natural order).
 
> 

[jira] [Created] (FLINK-27322) Add license headers and spotless checks for them

2022-04-20 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-27322:
---

 Summary: Add license headers and spotless checks for them
 Key: FLINK-27322
 URL: https://issues.apache.org/jira/browse/FLINK-27322
 Project: Flink
  Issue Type: Bug
  Components: Documentation / Training / Exercises
Affects Versions: 1.14.4
Reporter: Nico Kruber
Assignee: Nico Kruber


It looks as if there are a couple of files that are missing their appropriate 
license headers, e.g. 
https://github.com/apache/flink-training/blob/0b1c83b16065484200564402bef2ca10ef19cb30/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/RideAndFare.java

We should fix that by:
# adding the missing license headers
# adding spotless checks to ensure this doesn't happen again

Potential downside: if a user doing the training exercises creates files on 
their own, these would need the license header as well. On the other hand, a 
simple `./gradlew spotlessApply` can fix that easily



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-220: Temporal State

2022-04-19 Thread Nico Kruber
 >
> > Does that answer your question?
> >
> >
> >
> > D.
> >
> >
> >
> > On Wed, Apr 13, 2022 at 12:21 PM David Anderson 
> > wrote:
> >
> >
> >
> >> Yun Tang and Jingsong,
> >>
> >>
> >>
> >> Some flavor of OrderedMapState is certainly feasible, and I do see some
> >> appeal in supporting Binary**State.
> >>
> >>
> >>
> >> However, I haven't seen a motivating use case for this generalization,
> >> and
> >> would rather keep this as simple as possible. By handling Longs we can
> >> already optimize a wide range of use cases.
> >>
> >>
> >>
> >> David
> >>
> >>
> >>
> >>
> >> On Tue, Apr 12, 2022 at 9:21 AM Yun Tang  wrote:
> >>
> >>
> >>
> >> >  Hi David,
> >> >
> >> >
> >> >
> >> > Could you share some explanations why SortedMapState cannot work in
> >> > details? I just cannot catch up what the statement below means:
> >> >
> >> >
> >> >
> >> > This was rejected as being overly difficult to implement in a way that
> >> > would cleanly leverage RocksDB’s iterators.
> >> >
> >> >
> >> >
> >> >
> >> > Best
> >> > Yun Tang
> >> > 
> >> > From: Aitozi 
> >> > Sent: Tuesday, April 12, 2022 15:00
> >> > To: dev@flink.apache.org 
> >> > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> >> >
> >> >
> >> >
> >> > Hi David
> >> > 
> >> >  I have look through the doc, I think it will be a good
> >> >  improvement
> >> 
> >> to
> >> 
> >> > this pattern usage, I'm interested in it. Do you have some POC work to
> >> > share for a closer look.
> >> > Besides, I have one question that can we support expose the namespace
> >> > in
> >> > the different state type not limited to `TemporalState`. By this, user
> >> 
> >> can
> >> 
> >> > specify the namespace
> >> > and the TemporalState is one of the special case that it use timestamp
> >> 
> >> as
> >> 
> >> > the namespace. I think it will be more extendable.
> >> > 
> >> > What do you think about this ?
> >> >
> >> >
> >> >
> >> > Best,
> >> > Aitozi.
> >> >
> >> >
> >> >
> >> > David Anderson  于2022年4月11日周一 20:54写道:
> >> >
> >> >
> >> >
> >> > > Greetings, Flink developers.
> >> > >
> >> > >
> >> > >
> >> > > I would like to open up a discussion of a proposal [1] to add a new
> >> 
> >> kind
> >> 
> >> > of
> >> > 
> >> > > state to Flink.
> >> > >
> >> > >
> >> > >
> >> > > The goal here is to optimize a fairly common pattern, which is using
> >> > >
> >> > >
> >> > >
> >> > > MapState>
> >> > >
> >> > >
> >> > >
> >> > > to store lists of events associated with timestamps. This pattern is
> >> 
> >> used
> >> 
> >> > > internally in quite a few operators that implement sorting and
> >> > > joins,
> >> 
> >> and
> >> 
> >> > > it also shows up in user code, for example, when implementing custom
> >> > > windowing in a KeyedProcessFunction.
> >> > >
> >> > >
> >> > >
> >> > > Nico Kruber, Seth Wiesman, and I have implemented a POC that
> >> > > achieves
> >> 
> >> a
> >> 
> >> > > more than 2x improvement in throughput when performing these
> >> 
> >> operations
> >> 
> >> > on
> >> > 
> >> > > RocksDB by better leveraging the capabilities of the RocksDB state
> >> > 
> >> > backend.
> >> > 
> >> > >
> >> > >
> >> > > See FLIP-220 [1] for details.
> >> > >
> >> > >
> >> > >
> >> > > Best,
> >> > > David
> >> > >
> >> > >
> >> > >
> >> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> >> > >
> >> > >
> >> >
> >> >
> >>
> >>
> >
> >

Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner




[jira] [Created] (FLINK-26852) RocksDBMapState#clear not forwarding exceptions

2022-03-24 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-26852:
---

 Summary: RocksDBMapState#clear not forwarding exceptions
 Key: FLINK-26852
 URL: https://issues.apache.org/jira/browse/FLINK-26852
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.14.4, 1.15.0
Reporter: Nico Kruber


I accidentally found an inconsistent behaviour in the RocksDB state backend 
implementation:

If there's an exception in {{AbstractRocksDBState#clear()}} it will forward 
that inside a {{FlinkRuntimeException}}.

However, if there's an exception in {{RocksDBMapState#clear}} it will merely 
print the exception stacktrace and continue as is.

I assume, forwarding the exception as {{FlinkRuntimeException}} should be the 
desired behaviour for both use cases...



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25362) Incorrect dependencies in Table Confluent/Avro docs

2021-12-17 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-25362:
---

 Summary: Incorrect dependencies in Table Confluent/Avro docs
 Key: FLINK-25362
 URL: https://issues.apache.org/jira/browse/FLINK-25362
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.14.2, 1.13.5, 1.12.7
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.3


"Confluent Avro Format" is missing an explanation to also
 * add the dependency to flink-avro
 * have the confluent repository defined

"Avro Format" should not show the maven dependency to {{flink-sql-avro}} but 
instead {{flink-avro}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Deprecate Java 8 support

2021-11-26 Thread Nico Kruber
Tue, Nov 23, 2021 at 8:46 AM David Morávek 
> > 
> > wrote:
> > > > >>>> Thank you Chesnay for starting the discussion! This will generate
> > 
> > bit
> > 
> > > > of
> > > > 
> > > > >>> a
> > > > >>> 
> > > > >>>> work for some users, but it's a good thing to keep moving the
> > 
> > project
> > 
> > > > >>>> forward. Big +1 for this.
> > > > >>>> 
> > > > >>>> Jingsong:
> > > > >>>> 
> > > > >>>> Receiving this signal, the user may be unhappy because his
> > 
> > application
> > 
> > > > >>>>> may be all on Java 8. Upgrading is a big job, after all, many
> > 
> > systems
> > 
> > > > >>>>> have not been upgraded yet. (Like you said, HBase and Hive)
> > > > >>>> 
> > > > >>>> The whole point of deprecation is to raise awareness, that this
> > 
> > will
> > 
> > > > be
> > > > 
> > > > >>>> happening eventually and users should take some steps to address
> > 
> > this
> > 
> > > > in
> > > > 
> > > > >>>> medium-term. If I understand Chesnay correctly, we'd still keep
> > 
> > Java 8
> > 
> > > > >>>> around for quite some time to give users enough time to upgrade,
> > 
> > but
> > 
> > > > >>>> without raising awareness we'd fight the very same argument later
> > 
> > in
> > 
> > > > >>> time.
> > > > >>> 
> > > > >>>> All of the prerequisites from 3rd party projects for both HBase
> > 
> > [1]
> > 
> > > > and
> > > > 
> > > > >>>> Hive [2] to fully support Java 11 have been completed, so the
> > 
> > ball is
> > 
> > > > on
> > > > 
> > > > >>>> their side and there doesn't seem to be much activity. Generating
> > 
> > bit
> > 
> > > > >>> more
> > > > >>> 
> > > > >>>> pressure on these efforts might be a good thing.
> > > > >>>> 
> > > > >>>> It would be great to identify some of these users and learn bit
> > 
> > more
> > 
> > > > >>> about
> > > > >>> 
> > > > >>>> their situation. Are they keeping up with latest Flink
> > 
> > developments or
> > 
> > > > >>> are
> > > > >>> 
> > > > >>>> they lagging behind (this would also give them way more time for
> > > > >>>> eventual
> > > > >>>> upgrade)?
> > > > >>>> 
> > > > >>>> [1] https://issues.apache.org/jira/browse/HBASE-22972
> > > > >>>> [2] https://issues.apache.org/jira/browse/HIVE-22415
> > > > >>>> 
> > > > >>>> Best,
> > > > >>>> D.
> > > > >>>> 
> > > > >>>> On Tue, Nov 23, 2021 at 3:08 AM Jingsong Li <
> > 
> > jingsongl...@gmail.com>
> > 
> > > > >>>> wrote:
> > > > >>>>> Hi Chesnay,
> > > > >>>>> 
> > > > >>>>> Thanks for bringing this for discussion.
> > > > >>>>> 
> > > > >>>>> We should dig deeper into the current Java version of Flink
> > 
> > users. At
> > 
> > > > >>>>> least make sure Java 8 is not a mainstream version.
> > > > >>>>> 
> > > > >>>>> Receiving this signal, the user may be unhappy because his
> > > > 
> > > > application
> > > > 
> > > > >>>>> may be all on Java 8. Upgrading is a big job, after all, many
> > 
> > systems
> > 
> > > > >>>>> have not been upgraded yet. (Like you said, HBase and Hive)
> > > > >>>>> 
> > > > >>>>> In my opinion, it is too early to deprecate support for Java 8.
> > 
> > We
> > 
> > > > >>>>> should wait for a safer point in time.
> > > > >>>>> 
> > > > >>>>> On Mon, Nov 22, 2021 at 11:45 PM Ingo Bürk 
> > > > 
> > > > wrote:
> > > > >>>>>> Hi,
> > > > >>>>>> 
> > > > >>>>>> also a +1 from me because of everything Chesnay already said.
> > > > >>>>>> 
> > > > >>>>>> 
> > > > >>>>>> Ingo
> > > > >>>>>> 
> > > > >>>>>> On Mon, Nov 22, 2021 at 4:41 PM Martijn Visser <
> > > > >>> 
> > > > >>> mart...@ververica.com>


-- 
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner




[jira] [Created] (FLINK-25027) Allow GC of a finished job's JobMaster before the slot timeout is reached

2021-11-23 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-25027:
---

 Summary: Allow GC of a finished job's JobMaster before the slot 
timeout is reached
 Key: FLINK-25027
 URL: https://issues.apache.org/jira/browse/FLINK-25027
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.3, 1.12.5, 1.14.0
Reporter: Nico Kruber
 Attachments: image-2021-11-23-20-32-20-479.png

In a session cluster, after a (batch) job is finished, the JobMaster seems to 
stick around for another couple of minutes before being eligible for garbage 
collection.

Looking into a heap dump, it seems to be tied to a 
{{PhysicalSlotRequestBulkCheckerImpl}} which is enqueued in the underlying Akka 
executor (and keeps the JM from being GC’d). Per default the action is 
scheduled for {{slot.request.timeout}} that defaults to 5 min (thanks 
[~trohrmann] for helping out here)

!image-2021-11-23-20-32-20-479.png!

With this setting, you will have to account for enough metaspace to cover 5 
minutes of time which may span a couple of jobs, needlessly!


The problem seems to be that Flink is using the main thread executor for the 
scheduling that uses the {{ActorSystem}}'s scheduler and the future task 
scheduled with Akka can (probably) not be easily cancelled.
One idea could be to use a dedicated thread pool per JM, that we shut down when 
the JM terminates. That way we would not keep the JM from being GC’d.


(The concrete example we investigated was a DataSet job)







--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25023) ClassLoader leak on JM/TM through indirectly-started Hadoop threads out of user code

2021-11-23 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-25023:
---

 Summary: ClassLoader leak on JM/TM through indirectly-started 
Hadoop threads out of user code
 Key: FLINK-25023
 URL: https://issues.apache.org/jira/browse/FLINK-25023
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem, Connectors / Hadoop 
Compatibility, FileSystems
Affects Versions: 1.13.3, 1.12.5, 1.14.0
Reporter: Nico Kruber


If a Flink job is using HDFS through Flink's filesystem abstraction (either on 
the JM or TM), that code may actually spawn a few threads, e.g. from static 
class members:
 * {{org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner}}
 * {{IPC Parameter Sending Thread#*}}

These threads are started as soon as the classes are loaded which may be in the 
context of the user code. In this specific scenario, however, the created 
threads may contain references to the context class loader (I did not see that 
though) or, as happened here, it may inherit thread contexts such as the 
{{ProtectionDomain}} (from an {{{}AccessController{}}}).

Hence user contexts and user class loaders are leaked into long-running threads 
that are run in Flink's (parent) classloader.

Fortunately, it seems to only *leak a single* {{ChildFirstClassLoader}} in this 
concrete example but that may depend on which code paths each client execution 
is walking.

 

A *proper solution* doesn't seem so simple:
 * We could try to proactively initialize available file systems in the hope to 
start all threads in the parent classloader with parent context.
 * We could create a default {{ProtectionDomain}} for spawned threads as 
discussed at [https://dzone.com/articles/javalangoutofmemory-permgen], however, 
the {{StatisticsDataReferenceCleaner}} isn't actually actively spawned from any 
callback but as a static variable and this with the class loading itself (but 
maybe this is still possible somehow).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25022) ClassLoader leak with ThreadLocals on the JM when submitting a job through the REST API

2021-11-23 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-25022:
---

 Summary: ClassLoader leak with ThreadLocals on the JM when 
submitting a job through the REST API
 Key: FLINK-25022
 URL: https://issues.apache.org/jira/browse/FLINK-25022
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.13.3, 1.12.5, 1.14.0
Reporter: Nico Kruber


If a job is submitted using the REST API's {{/jars/:jarid/run}} endpoint, user 
code has to be executed on the JobManager and it is doing this in a couple of 
(pooled) dispatcher threads like {{{}Flink-DispatcherRestEndpoint-thread-*{}}}.

If the user code is using thread locals (and not cleaning them up), they may 
remain in the thread with references to the {{ChildFirstClassloader}} of the 
job and thus leaking that.

We saw this for the {{jsoniter}} scala library at the JM which [creates 
ThreadLocal 
instances|https://github.com/plokhotnyuk/jsoniter-scala/blob/95c7053cfaa558877911f3448382f10d53c4fcbf/jsoniter-scala-core/jvm/src/main/scala/com/github/plokhotnyuk/jsoniter_scala/core/package.scala]
 but doesn't remove them, but it can actually happen with any user code or 
(worse) library used in user code.

 

There are a few *workarounds* a user can use, e.g. putting the library in 
Flink's lib/ folder or submitting via the Flink CLI, but these may actually not 
be possible to use, depending on the circumstances.

 

A *proper fix* should happen in Flink by guarding against any of these things 
in the dispatcher threads. We could, for example, spawn a separate thread for 
executing the user's {{main()}} method and once the job is submitted exit that 
thread and destroy all thread locals along with it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24769) FlameGraphs in 1.14 do not aggregate subtasks' stack traces anymore

2021-11-04 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-24769:
---

 Summary: FlameGraphs in 1.14 do not aggregate subtasks' stack 
traces anymore
 Key: FLINK-24769
 URL: https://issues.apache.org/jira/browse/FLINK-24769
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.14.0
Reporter: Nico Kruber
 Attachments: image-2021-11-04-12-59-24-308.png

Since Flink 1.14.0, after enabling FlameGraphs and gathering statistics for a 
task, it doesn't aggregate the results from the parallel instances anymore but 
instead shows each individual one - something that easily gets too messy for 
higher parallelism.

It seems the last shared method on the stack is 
{{Task.runWithSystemExitMonitoring}} and then it spawns off into individual 
lambda functions:

 !image-2021-11-04-12-59-24-308.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24411) Add docs navigation for release notes

2021-09-30 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-24411:
---

 Summary: Add docs navigation for release notes
 Key: FLINK-24411
 URL: https://issues.apache.org/jira/browse/FLINK-24411
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.13.2, 1.12.5, 1.14.0, 1.11.4
Reporter: Nico Kruber


I propose to add a "Release Notes" section into the documentation's navigation 
bar for things like 
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/

At the moment, I feel a bit lost in the navigation when viewing that page 
(which is only linked from the docs home page which I barely ever look at).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24144) Improve DataGenerator to prevent excessive creation of new Random objects

2021-09-03 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-24144:
---

 Summary: Improve DataGenerator to prevent excessive creation of 
new Random objects
 Key: FLINK-24144
 URL: https://issues.apache.org/jira/browse/FLINK-24144
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training / Exercises
Affects Versions: 1.13.2, 1.14.0
Reporter: Nico Kruber
Assignee: Nico Kruber


For a couple of methods in {{DataGenerator}}, new {{Random}} objects are 
created with a specific seed. Instead, we could create a single {{Random}} 
object and reset the seed when needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24115) Outdated SQL Temporal Join Example

2021-09-01 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-24115:
---

 Summary: Outdated SQL Temporal Join Example
 Key: FLINK-24115
 URL: https://issues.apache.org/jira/browse/FLINK-24115
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Reporter: Nico Kruber


[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#event-time-temporal-join]
 is missing a primary key in the Table DDL.

Also, the following note does not map the current example anymore:
{quote}
Note: The event-time temporal join requires the primary key contained in the 
equivalence condition of the temporal join condition, e.g., The primary key 
P.product_id of table product_changelog to be constrained in the condition 
O.product_id = P.product_id.
{quote}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24022) Re-Enable Scala checks in flink-training CI

2021-08-27 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-24022:
---

 Summary: Re-Enable Scala checks in flink-training CI
 Key: FLINK-24022
 URL: https://issues.apache.org/jira/browse/FLINK-24022
 Project: Flink
  Issue Type: Bug
  Components: Documentation / Training / Exercises
Affects Versions: 1.14.0, 1.13.3
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0, 1.13.3


FLINK-23339 disabled Scala by default but therefore also disabled CI for newly 
checked-in changes on the Scala code.
We should run CI with Scala enabled



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24016) Restore 1.12 SQL docs page on state retention

2021-08-27 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-24016:
---

 Summary: Restore 1.12 SQL docs page on state retention
 Key: FLINK-24016
 URL: https://issues.apache.org/jira/browse/FLINK-24016
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Table SQL / API
Affects Versions: 1.13.2, 1.14.0
Reporter: Nico Kruber


{color:#1d1c1d}It seems that the whole {color}[section about state retention 
from the 
docs|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/query_configuration.html#idle-state-retention-time]{color:#1d1c1d}
 in Flink 1.12 vanished with Flink 1.13. It was outdated with these min/max 
settings but instead of updating it, it was just removed and state 
retention/TTL is now safely hidden in 
[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-state-ttl]{color}

{color:#1d1c1d}The discussion in the 1.12 docs is, however, superior since it 
explains a bit more why we need it and the types of queries that need it. I 
therefore propose to restore it somewhere in the docs.
{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24008) Support state cleanup based on unique keys

2021-08-26 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-24008:
---

 Summary: Support state cleanup based on unique keys
 Key: FLINK-24008
 URL: https://issues.apache.org/jira/browse/FLINK-24008
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Affects Versions: 1.14.0
Reporter: Nico Kruber


In a join of two tables where we join on unique columns, e.g. from primary 
keys, we could clean up join state more pro-actively since we now whether 
future joins with this row are still possible (assuming uniqueness of that 
key). While this may not solve all issues of growing state in non-time-based 
joins it may still considerably reduce state size, depending on the involved 
columns.

This would add one more way of expiring state that the operator stores; 
currently there are only these
 * time-based joins, e.g. interval join
 * idle state retention via \{{TableConfig#setIdleStateRetention()}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23993) Describe eventually-consistency of materializing upserts

2021-08-26 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23993:
---

 Summary: Describe eventually-consistency of materializing upserts
 Key: FLINK-23993
 URL: https://issues.apache.org/jira/browse/FLINK-23993
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Table SQL / Ecosystem
Affects Versions: 1.14.0
Reporter: Nico Kruber


FLINK-20374 added an upsert materialization operator which fixes the order of 
shuffled streams. The results of this operator are actually _eventually 
consistent_ (it collects the latest value it has seen and redacts older 
versions when these are not valid anymore). You could see a result stream like 
this, based on the order the materialization receives events:

+I10, -I10, +I5, -I5, +I10, -I10, +I3, -I3, +I10

Each time, the value stored in Kafka would change until the "final" result is 
in.

 

It may be acceptable for upsert sinks, but should be documented (or 
changed/fixed) nonetheless.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23839) Unclear severity of Kafka transaction recommit warning in logs

2021-08-17 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23839:
---

 Summary: Unclear severity of Kafka transaction recommit warning in 
logs
 Key: FLINK-23839
 URL: https://issues.apache.org/jira/browse/FLINK-23839
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Affects Versions: 1.13.2, 1.12.5, 1.11.4
Reporter: Nico Kruber


In a transactional Kafka sink, after recovery, all transactions from the 
recovered checkpoint are recommitted even though they may have already been 
committed before because this is not part of the checkpoint.

This second commit can lead to a number of WARN entries in the logs coming from 
[KafkaCommitter|https://github.com/apache/flink/blob/6c9818323b41a84137c52822d2993df788dbc9bb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java#L66]
 or 
[FlinkKafkaProducer|https://github.com/apache/flink/blob/6c9818323b41a84137c52822d2993df788dbc9bb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1057].

Examples:
{code}
WARN [org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer] ... 
Encountered error org.apache.kafka.common.errors.InvalidTxnStateException: The 
producer attempted a transactional operation in an invalid state. while 
recovering transaction KafkaTransactionState [transactionalId=..., 
producerId=12345, epoch=123]. Presumably this transaction has been already 
committed before.

WARN [org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer] ... 
Encountered error org.apache.kafka.common.errors.ProducerFencedException: 
Producer attempted an operation with an old epoch. Either there is a newer 
producer with the same transactionalId, or the producer's transaction has been 
expired by the broker. while recovering transaction KafkaTransactionState 
[transactionalId=..., producerId=12345, epoch=12345]. Presumably this 
transaction has been already committed before
{code}

It sounds to me like the second exception is useful and indicates that the 
transaction timeout is too short. The first exception, however, seems 
superfluous and rather alerts the user more than it helps. Or what would you do 
with it?


Can we instead filter out superfluous exceptions and at least put these onto 
DEBUG logs instead?




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23812) Support configuration of the RocksDB logging via configuration

2021-08-16 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23812:
---

 Summary: Support configuration of the RocksDB logging via 
configuration
 Key: FLINK-23812
 URL: https://issues.apache.org/jira/browse/FLINK-23812
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.13.2
Reporter: Nico Kruber
Assignee: Nico Kruber


Since FLINK-14482 has been merged now, we should also allow users to configure 
more than just the log level (FLINK-20911) but also the following parameters so 
that they can safely enable RocksDB logging again by using a rolling logger, 
for example:
- max log file size via {{state.backend.rocksdb.log.max-file-size}}
- logging files to keep via {{state.backend.rocksdb.log.file-num}}
- log directory {{state.backend.rocksdb.log.dir}}, e.g. to put these logs onto 
a (separate) volume that may not be local and is retained after container 
shutdown for debugging purposes




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23670) Add Scala formatting checks to training exercises

2021-08-06 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23670:
---

 Summary: Add Scala formatting checks to training exercises
 Key: FLINK-23670
 URL: https://issues.apache.org/jira/browse/FLINK-23670
 Project: Flink
  Issue Type: Bug
  Components: Documentation / Training / Exercises
Affects Versions: 1.13.2
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.13.3


Currently, there are no formatting checks for Scala code in the training 
exercises. We should employ the same checks that the main Flink project is 
using.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23669) Avoid using Scala >= 2.12.8 in Flink Training exercises

2021-08-06 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23669:
---

 Summary: Avoid using Scala >= 2.12.8 in Flink Training exercises
 Key: FLINK-23669
 URL: https://issues.apache.org/jira/browse/FLINK-23669
 Project: Flink
  Issue Type: Bug
  Components: Documentation / Training / Exercises
Affects Versions: 1.13.2
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.13.3


The current IDE setup instructions of the Flink training exercises do not 
mention a specific Scala SDK to set up. For compatibility reasons described in 
https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/datastream/project-configuration/#scala-versions,
 we should also not use 2.12.8 and up.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23667) Fix training exercises IDE setup description for Scala

2021-08-06 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23667:
---

 Summary: Fix training exercises IDE setup description for Scala
 Key: FLINK-23667
 URL: https://issues.apache.org/jira/browse/FLINK-23667
 Project: Flink
  Issue Type: Bug
  Components: Documentation / Training / Exercises
Affects Versions: 1.13.2
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.13.3


If you follow the training exercises instructions to set up your IDE with code 
formatting and the Save Actions plugin while having Scala enabled, it will 
completely reformat your Scala code files instead of keeping them as is.

The instructions should be updated to match the ones used for the Flink main 
project.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-04 Thread Nico Kruber
That's actually also what I'm seeing most of the time and what I'd expect to 
improve with the newer RocksDB version.
Hence, I'd also favour the upgrade even if there is a slight catch with 
respect to performance - we should, however, continue to investigate this 
together with the RocksDB community.


Nico

On Wednesday, 4 August 2021 14:26:32 CEST David Anderson wrote:
> I am hearing quite often from users who are struggling to manage memory
> usage, and these are all users using RocksDB. While I don't know for
> certain that RocksDB is the cause in every case, from my perspective,
> getting the better memory stability of version 6.20 in place is critical.
> 
> Regards,
> David
> 
> On Wed, Aug 4, 2021 at 8:08 AM Stephan Ewen  wrote:
> > Hi all!
> > 
> > *!!!  If you are a big user of the Embedded RocksDB State Backend and have
> > performance sensitive workloads, please read this !!!*
> > 
> > I want to quickly raise some awareness for a RocksDB version upgrade we
> > plan to do, and some possible impact on application performance.
> > 
> > *We plan to upgrade RocksDB to version 6.20.* That version of RocksDB
> > unfortunately introduces some non-trivial performance regression. In our
> > Nexmark Benchmark, at least one query is up to 13% slower.
> > With some fixes, this can be improved, but even then there is an overall
> > *regression up to 6% in some queries*. (See attached table for results
> > from relevant Nexmark Benchmark queries).
> > 
> > We would do this update nonetheless, because we need to get new features
> > and bugfixes from RocksDB in.
> > 
> > Please respond to this mail thread if you have major concerns about this.
> > 
> > 
> > *### Fallback Plan*
> > 
> > Optionally, we could fall back to Plan B, which is to upgrade RocksDB only
> > to version 5.18.4.
> > Which has no performance regression (after applying a custom patch).
> > 
> > While this spares us the performance degradation of RocksDB 6.20.x, this
> > 
> > has multiple disadvantages:
> >   - Does not include the better memory stability (strict cache control)
> >   - Misses out on some new features which some users asked about
> >   - Does not have the latest RocksDB bugfixes
> > 
> > The latest point is especially bad in my opinion. While we can cherry-pick
> > some bugfixes back (and have done this in the past), users typically run
> > into an issue first and need to trace it back to RocksDB, then one of the
> > committers can find the relevant patch from RocksDB master and backport
> > it.
> > That isn't the greatest user experience.
> > 
> > Because of those disadvantages, we would prefer to do the upgrade to the
> > newer RocksDB version despite the unfortunate performance regression.
> > 
> > Best,
> > Stephan


-- 
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner




[jira] [Created] (FLINK-23340) Improve development instructions for flink-training exercises

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23340:
---

 Summary: Improve development instructions for flink-training 
exercises
 Key: FLINK-23340
 URL: https://issues.apache.org/jira/browse/FLINK-23340
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0, 1.13.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23339) Disable flink-training exercises in Scala by default

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23339:
---

 Summary: Disable flink-training exercises in Scala by default
 Key: FLINK-23339
 URL: https://issues.apache.org/jira/browse/FLINK-23339
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0, 1.13.3


At various occasions during training held by us, people who were not developing 
in Scala have reported issues with the current setup. If we make the Scala 
exercises optional, that should help reducing friction for the others.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23338) Use Spotless for flink-training exercises as well

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23338:
---

 Summary: Use Spotless for flink-training exercises as well
 Key: FLINK-23338
 URL: https://issues.apache.org/jira/browse/FLINK-23338
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0, 1.13.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23337) Properly use the 'shadow' plugin and remove flinkShadowJar

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23337:
---

 Summary: Properly use the 'shadow' plugin and remove flinkShadowJar
 Key: FLINK-23337
 URL: https://issues.apache.org/jira/browse/FLINK-23337
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0, 1.13.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23336) Use the same log4j version as in Flink 1.13

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23336:
---

 Summary: Use the same log4j version as in Flink 1.13
 Key: FLINK-23336
 URL: https://issues.apache.org/jira/browse/FLINK-23336
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23335) Add a separate 'runSolution' gradle task

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23335:
---

 Summary: Add a separate 'runSolution' gradle task
 Key: FLINK-23335
 URL: https://issues.apache.org/jira/browse/FLINK-23335
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23334) Move 'application' implementation decision to subprojects

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23334:
---

 Summary: Move 'application' implementation decision to subprojects
 Key: FLINK-23334
 URL: https://issues.apache.org/jira/browse/FLINK-23334
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23333) Improve gradle setup for flink-training exercises

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-2:
---

 Summary: Improve gradle setup for flink-training exercises
 Key: FLINK-2
 URL: https://issues.apache.org/jira/browse/FLINK-2
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23332) Update flink-training exercises gradle version

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23332:
---

 Summary: Update flink-training exercises gradle version
 Key: FLINK-23332
 URL: https://issues.apache.org/jira/browse/FLINK-23332
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training / Exercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23331) FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on Azure

2021-07-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23331:
---

 Summary: 
FileReadingWatermarkITCase.testWatermarkEmissionWithChaining fails on Azure
 Key: FLINK-23331
 URL: https://issues.apache.org/jira/browse/FLINK-23331
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.0
Reporter: Nico Kruber


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20223=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=f508e270-48d6-5f1e-3138-42a17e0714f0=4767

{code}
Jul 09 09:24:32 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 3.266 s <<< FAILURE! - in 
org.apache.flink.test.streaming.api.FileReadingWatermarkITCase
Jul 09 09:24:32 [ERROR] 
testWatermarkEmissionWithChaining(org.apache.flink.test.streaming.api.FileReadingWatermarkITCase)
  Time elapsed: 3.191 s  <<< FAILURE!
Jul 09 09:24:32 java.lang.AssertionError: too few watermarks emitted: 4
Jul 09 09:24:32 at org.junit.Assert.fail(Assert.java:89)
Jul 09 09:24:32 at org.junit.Assert.assertTrue(Assert.java:42)
Jul 09 09:24:32 at 
org.apache.flink.test.streaming.api.FileReadingWatermarkITCase.testWatermarkEmissionWithChaining(FileReadingWatermarkITCase.java:66)
Jul 09 09:24:32 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Jul 09 09:24:32 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Jul 09 09:24:32 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jul 09 09:24:32 at java.lang.reflect.Method.invoke(Method.java:498)
Jul 09 09:24:32 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Jul 09 09:24:32 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Jul 09 09:24:32 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Jul 09 09:24:32 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Jul 09 09:24:32 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Jul 09 09:24:32 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Jul 09 09:24:32 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Jul 09 09:24:32 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Jul 09 09:24:32 at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
Jul 09 09:24:32 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
Jul 09 09:24:32 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
Jul 09 09:24:32 at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
Jul 09 09:24:32 at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
Jul 09 09:24:32 at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
Jul 09 09:24:32 at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
Jul 09 09:24:32 at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Jul 09 09:24:32 

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23312) Use -Dfast for building e2e tests on AZP

2021-07-08 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23312:
---

 Summary: Use -Dfast for building e2e tests on AZP
 Key: FLINK-23312
 URL: https://issues.apache.org/jira/browse/FLINK-23312
 Project: Flink
  Issue Type: Improvement
  Components: Test Infrastructure
Affects Versions: 1.13.1
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0


The "e2e" builder in Azure pipelines builds Flink again on top of what the 
"compile" builder is already doing. This unnecessary duplicates a couple of 
checks that are enough to execute once and can be skipped via providing 
{{-Dfast}}.

On my local machine with 32GB RAM, 8 physical cores and a fast NVMe SSD, the 
difference is pretty big:
{code}
time mvn clean install -Dscala-2.12 -DskipTests -pl flink-dist -am
# -> 6:40 min

time mvn clean install -Dscala-2.12 -DskipTests -Dfast -pl flink-dist -am
# -> 5:40 min
{code}

Therefore, I'm proposing to add this parameter to the "e2e" builder's compile 
step.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23311) Improve PojoSerializer test

2021-07-08 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23311:
---

 Summary: Improve PojoSerializer test
 Key: FLINK-23311
 URL: https://issues.apache.org/jira/browse/FLINK-23311
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.13.1
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.14.0


While working with the PojoSerializer a bit more, I noticed a couple of minor 
things that are off in the current tests:
- the test Pojo does not take {{dumm5}} into account for {{hashCode}} and 
{{equals}}
- error messages are not so nice (and mix up the order of expected and actual 
values)

I'll create a PR for fixing these things in one go under this ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23102) Accessing FlameGraphs while not being enabled returns an exception

2021-06-22 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23102:
---

 Summary: Accessing FlameGraphs while not being enabled returns an 
exception
 Key: FLINK-23102
 URL: https://issues.apache.org/jira/browse/FLINK-23102
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.13.1
Reporter: Nico Kruber
 Attachments: image-2021-06-22-17-36-47-730.png

Trying to retrieve the FlameGraph in a job that doesn't have it enabled returns 
this ugly exception:

!image-2021-06-22-17-36-47-730.png!

Instead, it could mention that this feature is not enabled and describe how to 
enable it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23101) Flame Graphs initial view says it is 18800 days in the past

2021-06-22 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-23101:
---

 Summary: Flame Graphs initial view says it is 18800 days in the 
past
 Key: FLINK-23101
 URL: https://issues.apache.org/jira/browse/FLINK-23101
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Reporter: Nico Kruber
 Attachments: image.png

When you look at the Flame Graphs for a task for the first time, it will show 
an empty space and say that the measurement was ~18800 days in the past (see 
the attached image).

 

This should rather be something more useful like "no measurement yet" or so...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22699) Make ConstantArgumentCount public API

2021-05-18 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-22699:
---

 Summary: Make ConstantArgumentCount public API
 Key: FLINK-22699
 URL: https://issues.apache.org/jira/browse/FLINK-22699
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.13.0
Reporter: Nico Kruber


{{ConstantArgumentCount}} is quite useful when implementing custom type 
inference. While the user can, of course, implement an {{ArgumentCount}} as 
well with just a few methods, it feels like this one is the most commonly used 
implementation and could be provided as public API (currently, it's marked 
{{@Internal}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22405) Support fixed-lengh chars in the LeadLag built-in function

2021-04-22 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-22405:
---

 Summary: Support fixed-lengh chars in the LeadLag built-in function
 Key: FLINK-22405
 URL: https://issues.apache.org/jira/browse/FLINK-22405
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.2
Reporter: Nico Kruber


LeadLag aggregate function does not support type: ''CHAR'', as in the following 
example (a CAST to VARCHAR works around this). Technically, there should be no 
reason though to support STRING/VARCHAR but not CHAR:

{code:sql}
CREATE TEMPORARY VIEW test_cardinality AS
SELECT * FROM ( VALUES
  ('Alice', 'al...@test.com', ARRAY [ 'al...@test.com' ], 'Test Ltd'),
  ('Alice', 'al...@test.com', ARRAY [ 'al...@test.com' ], 'Test Ltd'),
  ('Alice', 'al...@test2.com', ARRAY [ 'al...@test.com', 'al...@test2.com' ], 
'Test Ltd'))
AS t ( name, email, aliases, company );
{code}

{code:sql}
SELECT
  name,
  LEAD(company, 0) AS company
FROM test_cardinality
WHERE CARDINALITY(aliases) >= 2
GROUP BY name;
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21584) Support UNNEST in LEFT JOINs

2021-03-03 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21584:
---

 Summary: Support UNNEST in LEFT JOINs
 Key: FLINK-21584
 URL: https://issues.apache.org/jira/browse/FLINK-21584
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Nico Kruber


Currently, UNNEST (for arrays and maps) is only supported in CROSS JOIN 
operations, but you may actually also want this in a LEFT JOIN fashion in which 
case you would get {{NULL}} values for the expanded fields.

h1. Example
{code:sql}
CREATE TEMPORARY VIEW input ( f1, f2 )
AS VALUES ('A', STR_TO_MAP('')), ('B', STR_TO_MAP('1, 2'));

SELECT * FROM input LEFT JOIN UNNEST(f2);
{code}

h1. Current workaround
{code:sql}
CREATE TEMPORARY VIEW input ( f1, f2 )
AS VALUES ('A', STR_TO_MAP('')), ('B', STR_TO_MAP('1, 2'));

SELECT * FROM input CROSS JOIN UNNEST(f2)
UNION ALL SELECT *, NULLIF('1', '1') AS `KEY`, NULLIF('1', '1') as `VALUE` FROM 
input WHERE f2 IS NULL OR CARDINALITY(f2) = 0;
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21583) Allow comments in CSV format without having to ignore parse errors

2021-03-03 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21583:
---

 Summary: Allow comments in CSV format without having to ignore 
parse errors
 Key: FLINK-21583
 URL: https://issues.apache.org/jira/browse/FLINK-21583
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Ecosystem
Affects Versions: 1.12.1
Reporter: Nico Kruber


Currently, when you pass {{'csv.allow-comments' = 'true'}} to a table 
definition, you also have to set {{'csv.ignore-parse-errors' = 'true'}} to 
actually skip the commented-out line (and the docs mention this prominently as 
well). This, however, may mask actual parsing errors that you want to be 
notified of.

I would like to propose that {{allow-comments}} actually also skips the 
commented-out lines automatically because these shouldn't be used anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21569) Flink SQL with CSV file input job hangs

2021-03-02 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21569:
---

 Summary: Flink SQL with CSV file input job hangs
 Key: FLINK-21569
 URL: https://issues.apache.org/jira/browse/FLINK-21569
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / Runtime
Affects Versions: 1.12.1
Reporter: Nico Kruber
 Attachments: airports.csv, flights-small2.csv

In extension to FLINK-21567, I actually also got the job to be stuck on 
cancellation by doing the following in the SQL client:

* configure SQL client defaults to run with parallelism 2
* execute the following statement

{code}
CREATE TABLE `airports` (
  `IATA_CODE` CHAR(3),
  `AIRPORT` STRING,
  `CITY` STRING,
  `STATE` CHAR(2),
  `COUNTRY` CHAR(3),
  `LATITUDE` DOUBLE NULL,
  `LONGITUDE` DOUBLE NULL,
  PRIMARY KEY (`IATA_CODE`) NOT ENFORCED
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///tmp/kaggle-flight-delay/airports.csv',
  'format' = 'csv',
  'csv.allow-comments' = 'true',
  'csv.ignore-parse-errors' = 'true',
  'csv.null-literal' = ''
);

CREATE TABLE `flights` (
  `_YEAR` CHAR(4),
  `_MONTH` CHAR(2),
  `_DAY` CHAR(2),
  `_DAY_OF_WEEK` TINYINT,
  `AIRLINE` CHAR(2),
  `FLIGHT_NUMBER` SMALLINT,
  `TAIL_NUMBER` CHAR(6),
  `ORIGIN_AIRPORT` CHAR(3),
  `DESTINATION_AIRPORT` CHAR(3),
  `_SCHEDULED_DEPARTURE` CHAR(4),
  `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || 
`_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
  `_DEPARTURE_TIME` CHAR(4),
  `DEPARTURE_DELAY` SMALLINT,
  `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), 
TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || 
SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')),
  `TAXI_OUT` SMALLINT,
  `WHEELS_OFF` CHAR(4),
  `SCHEDULED_TIME` SMALLINT,
  `ELAPSED_TIME` SMALLINT,
  `AIR_TIME` SMALLINT,
  `DISTANCE` SMALLINT,
  `WHEELS_ON` CHAR(4),
  `TAXI_IN` SMALLINT,
  `SCHEDULED_ARRIVAL` CHAR(4),
  `ARRIVAL_TIME` CHAR(4),
  `ARRIVAL_DELAY` SMALLINT,
  `DIVERTED` BOOLEAN,
  `CANCELLED` BOOLEAN,
  `CANCELLATION_REASON` CHAR(1),
  `AIR_SYSTEM_DELAY` SMALLINT,
  `SECURITY_DELAY` SMALLINT,
  `AIRLINE_DELAY` SMALLINT,
  `LATE_AIRCRAFT_DELAY` SMALLINT,
  `WEATHER_DELAY` SMALLINT
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///tmp/kaggle-flight-delay/flights-small2.csv',
  'format' = 'csv',
  'csv.null-literal' = ''
);

SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, `NUM_DELAYS`
FROM (
  SELECT `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`, COUNT(*) AS `NUM_DELAYS`,
ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) AS rownum
  FROM flights, airports
  WHERE `ORIGIN_AIRPORT` = `IATA_CODE` AND `DEPARTURE_DELAY` > 0
  GROUP BY `ORIGIN_AIRPORT`, `AIRPORT`, `STATE`)
WHERE rownum <= 10;
{code}

Results are shown in the CLI but after quitting the result view, the job seems 
stuck in CANCELLING until (at least) one of the TMs shuts itself down because a 
task wouldn't react to the cancelling signal. This appears in its TM logs:

{code}
2021-03-02 18:39:19,451 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Task 'Source: TableSourceScan(table=[[default_catalog, 
default_database, airports, project=[IATA_CODE, AIRPORT, STATE]]], 
fields=[IATA_CODE, AIRPORT, STATE]) (2/2)#0' did not react to cancelling signal 
for 30 seconds, but is stuck in method:
 sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:653)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
java.lang.Thread.run(Thread.java:748)

...

2021-03-02 18:39:49,447 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Task did not 
exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
within 180 + seconds.
at 
org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685)
 [flink-dist_2.12-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
2021-03-02 18:39:49,448 ERROR 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal error 
occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did no

[jira] [Created] (FLINK-21568) Navigating in SQL client can lead to SqlExecutionException

2021-03-02 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21568:
---

 Summary: Navigating in SQL client can lead to SqlExecutionException
 Key: FLINK-21568
 URL: https://issues.apache.org/jira/browse/FLINK-21568
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.12.1
Reporter: Nico Kruber


Pressing 'p' in the SQL CLI's result browser before any result is available 
will result in the following exception being thrown:
{code}
2021-03-02 18:27:05,153 WARN  org.apache.flink.table.client.cli.CliClient   
   [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid page '1'.
at 
org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.retrievePage(MaterializedCollectStreamResult.java:177)
 ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.retrieveResultPage(LocalExecutor.java:415)
 ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.client.cli.CliTableResultView.updatePage(CliTableResultView.java:293)
 ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.client.cli.CliTableResultView.gotoPreviousPage(CliTableResultView.java:381)
 ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.client.cli.CliTableResultView.evaluate(CliTableResultView.java:183)
 ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.client.cli.CliTableResultView.evaluate(CliTableResultView.java:50)
 ~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.table.client.cli.CliView.open(CliView.java:125) 
~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:675) 
~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:323) 
~[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_282]
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214) 
[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144) 
[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:115) 
[flink-sql-client_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) 
[flink-sql-client_2.12-1.12.1.jar:1.12.1]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21567) CSV Format exception while parsing: ArrayIndexOutOfBoundsException: 4000

2021-03-02 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21567:
---

 Summary: CSV Format exception while parsing: 
ArrayIndexOutOfBoundsException: 4000
 Key: FLINK-21567
 URL: https://issues.apache.org/jira/browse/FLINK-21567
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.12.1, 1.11.3
Reporter: Nico Kruber
 Attachments: flights-small.csv

I've been trying to play a bit with the data available at 
https://www.kaggle.com/usdot/flight-delays and got the following exception:

{code}
2021-02-16 18:57:37,913 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, flights, filter=[], project=[ORIGIN_AIRPORT, 
DEPARTURE_DELAY]]], fields=[ORIGIN_AIRPORT, DEPARTURE_DELAY]) -> 
Calc(select=[ORIGIN_AIRPORT], where=[(DEPARTURE_DELAY > 0)]) -> 
LocalHashAggregate(groupBy=[ORIGIN_AIRPORT], select=[ORIGIN_AIRPORT, 
Partial_COUNT(*) AS count1$0]) (1/1)#0 (ebbf1204d875a5a4ace529df0d5ba719) 
switched from RUNNING to FAILED.
java.io.IOException: Failed to deserialize CSV row.
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257)
 ~[flink-csv-1.12.1.jar:1.12.1]
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162)
 ~[flink-csv-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.lang.ArrayIndexOutOfBoundsException: 4000
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.skipLinesWhenNeeded(CsvDecoder.java:527)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.impl.CsvDecoder.startNewLine(CsvDecoder.java:499)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleObjectRowEnd(CsvParser.java:1067)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser._handleNextEntry(CsvParser.java:858)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser.nextFieldName(CsvParser.java:665)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:250)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:280)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:250)
 ~[flink-csv-1.12.1.jar:1.12.1]
... 5 more
{code}

h1. Fully working example:

Using the attached file (derived from the data on flight delays, linked above) 
and the SQL CLI:
{code}
CREATE TABLE `flights` (
  `_YEAR` CHAR(4),
  `_MONTH` CHAR(2),
  `_DAY` CHAR(2),
  `_DAY_OF_WEEK` TINYINT,
  `AIRLINE` CHAR(2),
  `FLIGHT_NUMBER` SMALLINT,
  `TAIL_NUMBER` CHAR(6),
  `ORIGIN_AIRPORT` CHAR(3),
  `DESTINATION_AIRPORT` CHAR(3),
  `_SCHEDULED_DEPARTURE` CHAR(4),
  `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || 
`_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
  `_DEPARTURE_TIME` CHAR(4),
  `DEPARTURE_DELAY` SMALLINT,
  `DEPARTURE_TIME` AS TIMESTAMPADD(MINUTE, CAST(`DEPARTURE_DELAY` AS INT), 
TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || `_DAY` || ' ' || 
SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00')),
  `TAXI_OUT` SMALLINT,
  `WHEELS_OFF` CHAR(4),
  `SCHEDULED_TIME` SMALLINT,
  `ELA

[jira] [Created] (FLINK-21566) Improve error message for "Unsupported casting"

2021-03-02 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21566:
---

 Summary: Improve error message for "Unsupported casting"
 Key: FLINK-21566
 URL: https://issues.apache.org/jira/browse/FLINK-21566
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Nico Kruber


In a situation like from FLINK-21565, neither the error message {{Unsupported 
casting from TINYINT to INTERVAL SECOND(3)}}, nor the exception trace (see 
FLINK-21565) gives you a good hint on where the error is, especially if you 
have many statements with TINYINTs or operations on these.

The query parser could highlight the location of the error inside the SQL 
statement that the user provided.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21565) Support more integer types in TIMESTAMPADD

2021-03-02 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21565:
---

 Summary: Support more integer types in TIMESTAMPADD
 Key: FLINK-21565
 URL: https://issues.apache.org/jira/browse/FLINK-21565
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.1
Reporter: Nico Kruber


At the moment, {{TIMESTAMPADD}} does not seem to support {{SMALLINT}} or 
{{TINYINT}} types which should be perfectly suitable for auto-conversion (in 
contrast to BIGINT or floating numbers where I would expect the user to cast it 
appropriately).

It currently fails with the following exception:
{code}
org.apache.flink.table.planner.codegen.CodeGenException: Unsupported casting 
from TINYINT to INTERVAL SECOND(3).
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.numericCasting(ScalarOperatorGens.scala:2352)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateBinaryArithmeticOperator(ScalarOperatorGens.scala:93)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:590)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:529)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) 
~[flink-table_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) 
~[flink-table_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map(TraversableLike.scala:233) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map$(TraversableLike.scala:226) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) 
~[flink-table_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143)
 ~[flink-table-blink_2.12-1.12.1.jar:1.12.1]
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 
~[flink

[jira] [Created] (FLINK-21563) Support using computed columns when defining (new) computed columns

2021-03-02 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21563:
---

 Summary: Support using computed columns when defining (new) 
computed columns
 Key: FLINK-21563
 URL: https://issues.apache.org/jira/browse/FLINK-21563
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.11.3
Reporter: Nico Kruber


To avoid code duplications, it would be nice to be able to use computed columns 
in later computations of new computed columns, e.g.
{code}
CREATE TABLE `flights` (
  `_YEAR` CHAR(4),
  `_MONTH` CHAR(2),
  `_DAY` CHAR(2),
  `_SCHEDULED_DEPARTURE` CHAR(4),
  `SCHEDULED_DEPARTURE` AS TO_TIMESTAMP(`_YEAR` || '-' || `_MONTH` || '-' || 
`_DAY` || ' ' || SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 0 FOR 2) || ':' || 
SUBSTRING(`_SCHEDULED_DEPARTURE` FROM 3) || ':00'),
  `_DEPARTURE_TIME` CHAR(4),
  `DEPARTURE_DELAY` SMALLINT,
  `DEPARTURE_TIME` AS SCHEDULED_DEPARTURE + DEPARTURE_DELAY
)...
{code}

Otherwise, a user would have to repeat these calculations over and over again 
which is not that maintainable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21562) Add more informative message on CSV parsing errors

2021-03-02 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-21562:
---

 Summary: Add more informative message on CSV parsing errors
 Key: FLINK-21562
 URL: https://issues.apache.org/jira/browse/FLINK-21562
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
SQL / API
Affects Versions: 1.11.3
Reporter: Nico Kruber


I was parsing a CSV file with comments in it and used {{'csv.allow-comments' = 
'true'}} without also passing {{'csv.ignore-parse-errors' = 'true'}} to the 
table DDL to not hide any actual format errors.
Since I didn't just have strings in my table, this did of course stumble on the 
commented-out line with the following error:

{code}
2021-02-16 17:45:53,055 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, airports]], fields=[IATA_CODE, AIRPORT, CITY, STATE, COUNTRY, 
LATITUDE, LONGITUDE]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream 
Collect Sink (1/1)#0 (9f3a3965f18ed99ee42580bdb559ba66) switched from RUNNING 
to FAILED.
java.io.IOException: Failed to deserialize CSV row.
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257)
 ~[flink-csv-1.12.1.jar:1.12.1]
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162)
 ~[flink-csv-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: java.lang.NumberFormatException: empty String
at 
sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) 
~[?:1.8.0_275]
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) 
~[?:1.8.0_275]
at java.lang.Double.parseDouble(Double.java:538) ~[?:1.8.0_275]
at 
org.apache.flink.formats.csv.CsvToRowDataConverters.convertToDouble(CsvToRowDataConverters.java:203)
 ~[flink-csv-1.12.1.jar:1.12.1]
at 
org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createNullableConverter$ac6e531e$1(CsvToRowDataConverters.java:113)
 ~[flink-csv-1.12.1.jar:1.12.1]
at 
org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createRowConverter$18bb1dd$1(CsvToRowDataConverters.java:98)
 ~[flink-csv-1.12.1.jar:1.12.1]
at 
org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:251)
 ~[flink-csv-1.12.1.jar:1.12.1]
... 5 more
{code}

Two things should be improved here:
# commented-out lines should be ignored by default (potentially, FLINK-17133 
addresses this or at least gives the user the power to do so)
# the error message itself is not very informative: "empty String".

This ticket is about the latter. I would suggest to have at least a few more 
pointers to direct the user to finding the source in the CSV file/item/... - 
here, the data type could just be wrong or the CSV file itself may be 
wrong/corrupted and the user would need to investigate.
What exactly may help here, probably depends on the actual input connector this 
format is currently working with, e.g. line number in a csv file would be best, 
otherwise that may not be possible but we could show the whole line or at least 
a few surrounding fields...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20886) Add the option to get a threaddump on checkpoint timeouts

2021-01-07 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-20886:
---

 Summary: Add the option to get a threaddump on checkpoint timeouts
 Key: FLINK-20886
 URL: https://issues.apache.org/jira/browse/FLINK-20886
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.12.0
Reporter: Nico Kruber


For debugging checkpoint timeouts, I was thinking about the following addition 
to Flink:

When a checkpoint times out and the async thread is still running, create a 
threaddump [1] and either add this to the checkpoint stats, log it, or write it 
out.

This may help identifying where the checkpoint is stuck (maybe a lock, could 
also be in a third party lib like the FS connectors,...). It would give us some 
insights into what the thread is currently doing.

Limiting the scope of the threads would be nice but may not be possible in the 
general case since additional threads (spawned by the FS connector lib, or 
otherwise connected) may interact with the async thread(s) by e.g. going 
through the same locks.


[1] https://crunchify.com/how-to-generate-java-thread-dump-programmatically/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20674) Wrong send/received stats with UNION ALL

2020-12-18 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-20674:
---

 Summary: Wrong send/received stats with UNION ALL
 Key: FLINK-20674
 URL: https://issues.apache.org/jira/browse/FLINK-20674
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.3, 1.12.0
Reporter: Nico Kruber


When using {{UNION ALL}} to union the same table twice , the number of records 
and bytes sent is just half of what the next task receives:

Reproducible with this:
{code}
CREATE TEMPORARY TABLE test (
  `number` SMALLINT
)
WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);


SELECT * FROM (
(SELECT * FROM test)
UNION ALL
(SELECT * FROM test)
)
{code}

Arguably, the use case is not too useful but other combinations may be 
affected, too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20217) More fine-grained timer processing

2020-11-18 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-20217:
---

 Summary: More fine-grained timer processing
 Key: FLINK-20217
 URL: https://issues.apache.org/jira/browse/FLINK-20217
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.11.2, 1.10.2, 1.12.0
Reporter: Nico Kruber


Timers are currently processed in one big block under the checkpoint lock 
(under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic 
in a number of scenarios while doing checkpointing which would lead to 
checkpoints timing out (and even unaligned checkpoints would not help).

If you have a huge number of timers to process when advancing the watermark and 
the task is also back-pressured, the situation may actually be worse since you 
would block on the checkpoint lock and also wait for buffers/credits from the 
receiver.

I propose to make this loop more fine-grained so that it is interruptible by 
checkpoints, but maybe there is also some other way to improve here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20104) Web UI checkpoint stats "refresh" button has to be clicked twice

2020-11-12 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-20104:
---

 Summary: Web UI checkpoint stats "refresh" button has to be 
clicked twice
 Key: FLINK-20104
 URL: https://issues.apache.org/jira/browse/FLINK-20104
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.11.2, 1.12.0
 Environment: Firefox on Linux
Reporter: Nico Kruber


In order to get the UI's checkpoint stats updated, I always have to click the 
refresh button twice - a single click doesn't change anything.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20099) HeapStateBackend checkpoint error hidden under cryptic message

2020-11-11 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-20099:
---

 Summary: HeapStateBackend checkpoint error hidden under cryptic 
message
 Key: FLINK-20099
 URL: https://issues.apache.org/jira/browse/FLINK-20099
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.11.2
Reporter: Nico Kruber
 Attachments: Screenshot_20201112_001331.png

When the memory state back-end hits a certain size, it fails to permit 
checkpoints. Even though a very detailed exception is thrown at its source, 
this is neither logged nor shown in the UI:
 * Logs just contain:

{code:java}
00:06:41.462 [jobmanager-future-thread-14] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 
2 by task 8eb303cd3196310cb2671212f4ed013c of job 
c9b7a410bd3143864ca23ba89595d878 at 6a73bcf2-46b6-4735-a616-fdf09ff1471c @ 
localhost (dataPort=-1).
{code}
 * UI: (also see the attached Screenshot_20201112_001331.png)

{code:java}
Failure Message: The job has failed.
{code}
-> this isn't even true: the job is still running fine!

 

Debugging into {{PendingCheckpoint#abort()}} reveals that the causing exception 
is actually still in there but the detailed information from it is just never 
used.
 For reference, this is what is available there and should be logged or shown:
{code:java}
java.lang.Exception: Could not materialize checkpoint 2 for operator aggregates 
-> (Sink: sink-agg-365, Sink: sink-agg-180, Sink: sink-agg-45, Sink: 
sink-agg-30) (4/4).
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:191)
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:138)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size 
of the state is larger than the maximum permitted memory-backed state. 
Size=6122737 , maxSize=5242880 . Consider using a different state backend, like 
the File System State backend.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:50)
at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102)
... 3 more
Caused by: java.io.IOException: Size of the state is larger than the maximum 
permitted memory-backed state. Size=6122737 , maxSize=5242880 . Consider using 
a different state backend, like the File System State backend.
at 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)
at 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145)
at 
org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126)
at 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:199)
at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:476)
... 5 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20061) Row constructor unsupported in aggregation function

2020-11-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-20061:
---

 Summary: Row constructor unsupported in aggregation function
 Key: FLINK-20061
 URL: https://issues.apache.org/jira/browse/FLINK-20061
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.2
Reporter: Nico Kruber


I was trying to use {{ROW}} in a user-defined aggregate function in a query 
like this:
{code}
  SELECT `id`, HOP_END(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE) 
AS `window_end`,
RowMaxv0(`amount`, ROW(`timestamp`, `amount`, `payload`)) AS `max_amount`
  FROM `input`
  GROUP BY HOP(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE), `id`;
{code}

Eventually this resulted in an "unsupported" exception from Calcite:
{code}
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. null
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.ververica.platform.sql.functions.RowMaxv0.main(RowMaxv0.java:93)
Caused by: java.lang.UnsupportedOperationException
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateColumnListParams(SqlValidatorImpl.java:5689)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:268)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 5 more
{code}

A workaround for this is to go via a subquery like the following but 
ultimately, this should result in the same thing (a simple projection).
{code}
  SELECT `id`, HOP_END(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE) 
AS `window_end`,
RowMaxv0(`amount`, `row`) AS `max_amount`
  FROM (SELECT `id`, `timestamp`, `amount`, ROW(`timestamp`, `amount`, 
`payload`) AS `row` FROM `input`)
  GROUP BY HOP(`timestamp`, INTERVAL '10' MINUTE, INTERVAL '1' MINUTE), `id`
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20059) Outdated SQL docs on aggregate functions' merge

2020-11-09 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-20059:
---

 Summary: Outdated SQL docs on aggregate functions' merge
 Key: FLINK-20059
 URL: https://issues.apache.org/jira/browse/FLINK-20059
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Affects Versions: 1.11.2, 1.12.0
Reporter: Nico Kruber


In the java docs as well as the user docs, the {{merge}} method of an 
aggregation UDF is described as optional, e.g.
{quote}Merges a group of accumulator instances into one accumulator instance. 
This function must be implemented for data stream session window grouping 
aggregates and data set grouping aggregates.{quote}

However, it seems that nowadays this method is required in more cases (I 
stumbled on this for a HOP window in streaming):
{code}
StreamExecGlobalGroupAggregate.scala
  .needMerge(mergedAccOffset, mergedAccOnHeap, mergedAccExternalTypes)
StreamExecGroupWindowAggregateBase.scala
  generator.needMerge(mergedAccOffset = 0, mergedAccOnHeap = false)
StreamExecIncrementalGroupAggregate.scala
  .needMerge(mergedAccOffset, mergedAccOnHeap = true, 
mergedAccExternalTypes)
StreamExecLocalGroupAggregate.scala
  .needMerge(mergedAccOffset = 0, mergedAccOnHeap = true)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19999) State Processor API classes leaking into savepoint

2020-11-05 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-1:
---

 Summary: State Processor API classes leaking into savepoint
 Key: FLINK-1
 URL: https://issues.apache.org/jira/browse/FLINK-1
 Project: Flink
  Issue Type: Bug
  Components: API / State Processor
Affects Versions: 1.11.2
Reporter: Nico Kruber


Currently, any configuration for serializers that you are using when writing a 
State Processor API job will be shared with the serializers that are used for 
writing a savepoint. However, your normal job shouldn't necessarily depend on 
(helper) classes that you only use in the StateProc API job.

By default, for example, {{ExecutionConfig#autoTypeRegistrationEnabled = true}} 
and thus classes like 
{{org.apache.flink.runtime.checkpoint.OperatorSubtaskState}} will be registered 
with Kryo and will thus also be needed when reading the created savepoint if 
you have Kryo serialization in your job.

This particular instance can be worked around by calling 
{{ExecutionConfig#disableAutoTypeRegistration()}} but the problem is probably 
bigger and extends to other type registrations, e.g. POJOs, as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19972) Provide more details when type serializers are not compatible

2020-11-04 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-19972:
---

 Summary: Provide more details when type serializers are not 
compatible
 Key: FLINK-19972
 URL: https://issues.apache.org/jira/browse/FLINK-19972
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.11.2
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.12.0


Currently, when the type serializer is incompatible, you get exceptions like 
these:
{code:java}
StateMigrationException("For heap backends, the new namespace serializer must 
be compatible.");
StateMigrationException("The new namespace serializer must be compatible.");

StateMigrationException("For heap backends, the new state serializer must not 
be incompatible.");
StateMigrationException("The new state serializer cannot be incompatible.")

StateMigrationException("The new key serializer must be compatible."){code}
which are not really helpful to the user in debugging serializers.

Since we already have the old serializer (snapshot) and the new one available, 
we should add this detail to the exceptions for improved usability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19462) Checkpoint statistics for unfinished task snapshots

2020-09-29 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-19462:
---

 Summary: Checkpoint statistics for unfinished task snapshots
 Key: FLINK-19462
 URL: https://issues.apache.org/jira/browse/FLINK-19462
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Metrics
Reporter: Nico Kruber


If a checkpoint times out, there are currently no stats on the not-yet-finished 
tasks in the Web UI, so you have to crawl into (debug?) logs.

It would be nice to have these incomplete stats in there instead so that you 
know quickly what was going on. I could think of these ways to accomplish this:
 * the checkpoint coordinator could ask the TMs for it after failing the 
checkpoint or
 * the TMs could send the stats when they notice that the checkpoint is aborted

Maybe there are more options, but I think, this improvement in general would 
benefit debugging checkpoints.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19381) Fix docs about relocatable savepoints

2020-09-23 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-19381:
---

 Summary: Fix docs about relocatable savepoints
 Key: FLINK-19381
 URL: https://issues.apache.org/jira/browse/FLINK-19381
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.11.2, 1.12.0
Reporter: Nico Kruber


Although savepoints are relocatable since Flink 1.11, the docs still state 
otherwise, for example in 
[https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#triggering-savepoints]

The warning there, as well as the other changes from FLINK-15863, should be 
removed again and potentially replaces with new constraints.

One known constraint is that if taskowned state is used 
(\{{GenericWriteAhreadLog}} sink), savepoints are currently not relocatable yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19112) No access to metric group in ScalarFunction when optimizing

2020-09-01 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-19112:
---

 Summary: No access to metric group in ScalarFunction when 
optimizing
 Key: FLINK-19112
 URL: https://issues.apache.org/jira/browse/FLINK-19112
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.1
Reporter: Nico Kruber
 Attachments: MetricsGroupBug.java

Under some circumstances, I cannot access {{context.getMetricGroup()}} in a 
{{ScalarFunction}} like this (full job attached):
{code:java}
  public static class MyUDF extends ScalarFunction {
@Override
public void open(FunctionContext context) throws Exception {
  super.open(context);
  context.getMetricGroup();
}

public Integer eval(Integer id) {
  return id;
}
  }
{code}
which leads to this exception:
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: 
getMetricGroup is not supported when optimizing
at 
org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249)
at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57)
at ExpressionReducer$2.open(Unknown Source)
at 
org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118)
at 
org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696)
at 
org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618)
at 
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
at 
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50)
{code}
I also tried to work around this with a try-catch, assuming that this method is 
called once during optimisation and another time 

[jira] [Created] (FLINK-18962) Improve error message if checkpoint directory is not writable

2020-08-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18962:
---

 Summary: Improve error message if checkpoint directory is not 
writable
 Key: FLINK-18962
 URL: https://issues.apache.org/jira/browse/FLINK-18962
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.11.1
Reporter: Nico Kruber


If the checkpoint directory from {{state.checkpoints.dir}} is not writable by 
the user that Flink is running with, checkpoints will be declined, but the real 
cause is not mentioned anywhere:

* the Web UI says: "Cause: The job has failed" (the Flink job is running though)
* the JM log says:
{code}
2020-08-14 12:13:18,820 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 2 (type=CHECKPOINT) @ 159738819 for job 
2c567b14e8d0833404931ef47dfec266.
2020-08-14 12:13:18,921 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline 
checkpoint 2 by task 0d4fd75374ad16c8d963679e3c2171ec of job 
2c567b14e8d0833404931ef47dfec266 at a184deea621e3923fbfcb1d899348448 @ 
Nico-PC.lan (dataPort=35531).
{code}
* the TM log says:
{code}
2020-08-14 12:13:14,102 INFO  
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - 
Checkpoint 1 has been notified as aborted, would not trigger any checkpoint.
{code}

And that's it. It should have a real error message indicating that the 
checkpoint (sub)-directory could not be created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18955) Add snapshot path to job startup message

2020-08-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18955:
---

 Summary: Add snapshot path to job startup message
 Key: FLINK-18955
 URL: https://issues.apache.org/jira/browse/FLINK-18955
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.11.1
Reporter: Nico Kruber


When a job is started from a checkpoint or savepoint (I'm using snapshot as the 
unanimous term below), the {{CheckpointCoordinator}} prints a log line like 
this:
{code}
2020-08-13 13:50:51,418 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job 
220d8a4953cd40198b6eb3b1ec0cece0 from latest valid checkpoint: Checkpoint 357 @ 
1597326576925 for 220d8a4953cd40198b6eb3b1ec0cece0.
{code}

I propose to add the path to the snapshot to this message because which 
snapshot is taken for restore may actually not be that obvious for the user: 
even if a savepoint was specified in the job start command, e.g. in a 
Kubernetes pod spec, an HA store could overrule the decision and take a more 
recent snapshot instead. If that snapshot is a savepoint, it is not that easy 
to map this to checkpoint IDs and find out which savepoint the job actually 
started from.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18806) Taskmanager doesn't start up with error in config

2020-08-03 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18806:
---

 Summary: Taskmanager doesn't start up with error in config
 Key: FLINK-18806
 URL: https://issues.apache.org/jira/browse/FLINK-18806
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Scripts
Affects Versions: 1.11.1
Reporter: Nico Kruber


With the following (wrong) configuration setting in {{flink-conf.yaml}}, a 
taskmanager will not start up, basically print nothing on the command line, and 
have no log file to look at:

{code}
taskmanager.memory.managed.fraction: '0.4'
{code}

Console output:
{code}
> ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host Nico-PC.lan.
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18769) Streaming Table job stuck when enabling minibatching

2020-07-30 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18769:
---

 Summary: Streaming Table job stuck when enabling minibatching
 Key: FLINK-18769
 URL: https://issues.apache.org/jira/browse/FLINK-18769
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.1
Reporter: Nico Kruber


The following Table API streaming job is stuck when enabling mini batching

{code}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =

EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);

// disable mini-batching completely to get a result
Configuration tableConf = tableEnv.getConfig()
.getConfiguration();
tableConf.setString("table.exec.mini-batch.enabled", "true");
tableConf.setString("table.exec.mini-batch.allow-latency", "5 s");
tableConf.setString("table.exec.mini-batch.size", "5000");
tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

tableEnv.executeSql(
"CREATE TABLE input_table ("
+ "location STRING, "
+ "population INT"
+ ") WITH ("
+ "'connector' = 'kafka', "
+ "'topic' = 'kafka_batching_input', "
+ "'properties.bootstrap.servers' = 'localhost:9092', "
+ "'format' = 'csv', "
+ "'scan.startup.mode' = 'earliest-offset'"
+ ")");

tableEnv.executeSql(
"CREATE TABLE result_table WITH ('connector' = 'print') LIKE 
input_table (EXCLUDING OPTIONS)");

tableEnv
.from("input_table")
.groupBy($("location"))
.select($("location").cast(DataTypes.CHAR(2)).as("location"), 
$("population").sum().as("population"))
.executeInsert("result_table");
{code}

I am using a pre-populated Kafka topic called {{kafka_batching_input}} with 
these elements:
{code}
"Berlin",1
"Berlin",2
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18767) Streaming job stuck when disabling operator chaining

2020-07-30 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18767:
---

 Summary: Streaming job stuck when disabling operator chaining
 Key: FLINK-18767
 URL: https://issues.apache.org/jira/browse/FLINK-18767
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.11.1, 1.10.1, 1.9.3, 1.8.3
Reporter: Nico Kruber


The following code is stuck sending data from the source to the map operator. 
Two settings seem to have an influence here: {{env.setBufferTimeout(-1);}} and 
{{env.disableOperatorChaining();}} - if I remove either of these, the job works 
as expected.

(I pre-populated my Kafka topic with one element to reproduce easily)

{code}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// comment either these two and the job works
env.setBufferTimeout(-1);
env.disableOperatorChaining(); 

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic", new 
SimpleStringSchema(),
properties);
consumer.setStartFromEarliest();
DataStreamSource input = env.addSource(
consumer);

input
.map((x) -> x)
.print();

env.execute();
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18276) NullPointerException when closing KafkaConsumer

2020-06-12 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18276:
---

 Summary: NullPointerException when closing KafkaConsumer
 Key: FLINK-18276
 URL: https://issues.apache.org/jira/browse/FLINK-18276
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.10.1, 1.9.3, 1.8.3, 1.11.0
Reporter: Nico Kruber


{code}
WARN  org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher  - 
Error while closing Kafka consumer
java.lang.NullPointerException
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:282)
{code}

{{KafkaConsumerThread#reassignPartitions}} is temporarily setting {{consumer}} 
to {{null}} and if there is an exception (in this case, it was a timeout), the 
{{finally}} block in {{KafkaConsumerThread.run}} would fail with an NPE. Even 
more so, {{KafkaConsumerThread#reassignPartitions}} put the original consumer 
into {{consumerTmp}} which is not closed now and may leak underlying (Kafka) 
resources.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18255) Add API annotations to RocksDB user-facing classes

2020-06-11 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18255:
---

 Summary: Add API annotations to RocksDB user-facing classes
 Key: FLINK-18255
 URL: https://issues.apache.org/jira/browse/FLINK-18255
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.11.0
Reporter: Nico Kruber


Several user-facing classes in {{flink-statebackend-rocksdb}} don't have any 
API annotations, not even {{@PublicEvolving}}. These should be added to clarify 
their usage.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18242) Custom OptionsFactory in user code not working when configured via code

2020-06-10 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18242:
---

 Summary: Custom OptionsFactory in user code not working when 
configured via code
 Key: FLINK-18242
 URL: https://issues.apache.org/jira/browse/FLINK-18242
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.10.1, 1.10.0
Reporter: Nico Kruber
 Attachments: DefaultConfigurableOptionsFactoryWithLog.java

 When I configure a custom {{OptionsFactory}} for RocksDB like this:
{code:java}
Configuration globalConfig = GlobalConfiguration.loadConfiguration();
String checkpointDataUri = 
globalConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
RocksDBStateBackend stateBackend = new RocksDBStateBackend(checkpointDataUri);
stateBackend.setOptions(new DefaultConfigurableOptionsFactoryWithLog());
env.setStateBackend((StateBackend) stateBackend);{code}
it seems to be loaded
{code:java}
2020-06-10 12:54:20,720 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using 
predefined options: DEFAULT.
2020-06-10 12:54:20,721 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactoryWithLog{DefaultConfigurableOptionsFactory{configuredOptions={}}}.
 {code}
but it seems like none of the options defined in there is actually used. Just 
as an example, my factory does set the info log level to {{INFO_LEVEL}} but 
this is what you will see in the created RocksDB instance:
{code:java}
> cat /tmp/flink-io-c95e8f48-0daa-4fb9-a9a7-0e4fb42e9135/*/db/OPTIONS*|grep 
> info_log_level
  info_log_level=HEADER_LEVEL
  info_log_level=HEADER_LEVEL{code}
Together with the bug from FLINK-18241, is seems I cannot re-activate the 
RocksDB log that we disabled in FLINK-15068. FLINK-15747 was aiming at changing 
that particular configuration, but the problem seems broader since 
{{setDbLogDir()}} was actually also ignored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18241) Custom OptionsFactory in user code not working when configured via flink-conf.yaml

2020-06-10 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-18241:
---

 Summary: Custom OptionsFactory in user code not working when 
configured via flink-conf.yaml
 Key: FLINK-18241
 URL: https://issues.apache.org/jira/browse/FLINK-18241
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.10.1, 1.10.0
Reporter: Nico Kruber
 Attachments: DefaultConfigurableOptionsFactoryWithLog.java

It seems like Flink 1.10 broke custom {{OptionsFactory}} definitions via the 
{{state.backend.rocksdb.options-factory}} configuration if the implementation 
resides in the user-code jar file. This is particularly bad to debug RocksDB 
issues since we disabled its (ever-growing) LOG file in FLINK-15068.

If you look at the stack trace from the error below, you will notice, that 
{{StreamExecutionEnvironment}} is not provided with a user-code classloader and 
will us the one of its own class which is the parent loader that does not know 
about our {{OptionsFactory}}. This exact same code was working with Flink 1.9.3.

(I believe putting the custom {{OptionsFactory}} into a separate jar file 
inside Flink's lib folder may be a workaround but that should ideally not be 
needed).
{code:java}
2020-06-09 16:18:59,409 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not 
start cluster entrypoint StandaloneJobClusterEntryPoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint StandaloneJobClusterEntryPoint.
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:192)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:525)
 [flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:116)
 [flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
Caused by: org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:220)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:174)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:173)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
... 2 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve the 
JobGraph.
at 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:57)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:196)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:220)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:174)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:173)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
... 2 more
Caused by: org.apache.flink.util.FlinkException: Could not create the JobGraph 
from the provided user code jar.
at 
org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:114)
 ~[flink-dist_2.12-1.10.1-stream1.jar:1.10.1-stream1]
at 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55

[jira] [Created] (FLINK-17706) Clarify licensing situation

2020-05-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17706:
---

 Summary: Clarify licensing situation
 Key: FLINK-17706
 URL: https://issues.apache.org/jira/browse/FLINK-17706
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks
Affects Versions: 1.11.0
Reporter: Nico Kruber
 Fix For: 1.11.0


After enabling the rat plugin, it finds the following files with missing or 
invalid license headers:
{code:java}
  src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
  
src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
  src/main/java/org/apache/flink/benchmark/functions/IntLongApplications.java
  src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
  src/main/java/org/apache/flink/benchmark/functions/LongSource.java
  src/main/java/org/apache/flink/benchmark/functions/MultiplyByTwo.java
  src/main/java/org/apache/flink/benchmark/functions/MultiplyIntLongByTwo.java
  src/main/java/org/apache/flink/benchmark/functions/SuccessException.java
  src/main/java/org/apache/flink/benchmark/functions/SumReduce.java
  src/main/java/org/apache/flink/benchmark/functions/SumReduceIntLong.java
  src/main/java/org/apache/flink/benchmark/functions/ValidatingCounter.java
  src/main/java/org/apache/flink/benchmark/functions/QueuingLongSource.java
  src/main/java/org/apache/flink/benchmark/CollectSink.java
  src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
  
src/main/java/org/apache/flink/benchmark/operators/MultiplyByTwoCoStreamMap.java
  src/main/resources/avro/mypojo.avsc
  src/main/resources/protobuf/MyPojo.proto
  src/main/resources/thrift/mypojo.thrift
  save_jmh_result.py {code}
The license should be clarified with the author and all contributors of that 
file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17705) Add rat license checks

2020-05-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17705:
---

 Summary: Add rat license checks
 Key: FLINK-17705
 URL: https://issues.apache.org/jira/browse/FLINK-17705
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.11.0


Before the code from [https://github.com/dataArtisans/flink-benchmarks/] is 
contributed, the licenses should be cleaned up and as a first step, we should 
set up the {{apache-rat-plugin}} similarly to how the Flink main repo uses it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17704) Allow running specific benchmarks from maven directly

2020-05-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17704:
---

 Summary: Allow running specific benchmarks from maven directly
 Key: FLINK-17704
 URL: https://issues.apache.org/jira/browse/FLINK-17704
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.11.0


Sometimes it would be nice to run a specific benchmark from maven directly. 
Currently this can be done via:
{code:java}
 mvn -Dflink.version=1.11-SNAPSHOT clean package exec:exec 
-Dexec.executable=java -Dexec.args="-jar 
target/flink-hackathon-benchmarks-0.1.jar -rf csv 
org.apache.flink.benchmark.StreamNetworkThroughputBenchmarkExecutor"{code}
but this is quite cumbersome and erroneous. Instead, I propose to simply define 
a property which by default runs all benchmarks but can be overridden on the 
command line to run a specific pattern (that is interpreted by JMH) like this:
{code:java}
mvn -Dflink.version=1.11-SNAPSHOT exec:exec 
-Dbenchmarks="org.apache.flink.benchmark.StreamNetworkThroughputBenchmarkExecutor"
 {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17703) Default execution command fails due 'benchmark' profile being inactive

2020-05-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17703:
---

 Summary: Default execution command fails due 'benchmark' profile 
being inactive
 Key: FLINK-17703
 URL: https://issues.apache.org/jira/browse/FLINK-17703
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks
Affects Versions: 1.11.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.11.0


FLINK-17057 had some unfortunate side effects: by having the 
"{{include-netty-tcnative-dynamic"}} profile active by default, the 
"{{benchmark"}} profile was not active any more. Thus the following command 
that was typically used for running the benchmarks failed unless the 
"{{benchmark"}} profile was activated manually like this:
{code:java}
mvn -Dflink.version=1.11-SNAPSHOT clean package exec:exec -P benchmark{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17293) Port training exercises data sets to a generator

2020-04-21 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17293:
---

 Summary: Port training exercises data sets to a generator
 Key: FLINK-17293
 URL: https://issues.apache.org/jira/browse/FLINK-17293
 Project: Flink
  Issue Type: Improvement
  Components: Documentation / Training / Exercises
Affects Versions: 1.10.1
Reporter: Nico Kruber
Assignee: Nico Kruber


Currently, the training exercises still rely on training data hosted at 
Ververica:
- http://training.ververica.com/trainingData/nycTaxiRides.gz and
- http://training.ververica.com/trainingData/nycTaxiFares.gz

Since this has always been a problem for users (and one additional step), I 
propose to rewrite the training sources to use a data generator instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17279) Use gradle build scans

2020-04-20 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17279:
---

 Summary: Use gradle build scans
 Key: FLINK-17279
 URL: https://issues.apache.org/jira/browse/FLINK-17279
 Project: Flink
  Issue Type: Improvement
  Components: Training Excercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.10.1, 1.11.0


Gradle build scans [1] add quick analysis into what happened if a CI build 
failed. It would upload a report with detailed info to [1].

See this for an example: https://gradle.com/s/g3tdhu47lntoc

[1] https://scans.gradle.com/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17278) Add Travis to the training exercises

2020-04-20 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17278:
---

 Summary: Add Travis to the training exercises
 Key: FLINK-17278
 URL: https://issues.apache.org/jira/browse/FLINK-17278
 Project: Flink
  Issue Type: Improvement
  Components: Training Excercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.10.1, 1.11.0


This will run all the tests and verify code quality.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17277) Apply IntelliJ recommendations to training exercises

2020-04-20 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17277:
---

 Summary: Apply IntelliJ recommendations to training exercises
 Key: FLINK-17277
 URL: https://issues.apache.org/jira/browse/FLINK-17277
 Project: Flink
  Issue Type: Improvement
  Components: Training Excercises
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.10.1, 1.11.0


IntelliJ has a few recommendations on the original code of the training 
exercises. These should be addressed to serve as good reference code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17276) Add checkstyle to training exercises

2020-04-20 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17276:
---

 Summary: Add checkstyle to training exercises
 Key: FLINK-17276
 URL: https://issues.apache.org/jira/browse/FLINK-17276
 Project: Flink
  Issue Type: Improvement
  Components: Training Excercises
Affects Versions: 1.10.0, 1.11.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.10.1, 1.11.0


Port Flink's checkstyle to the training exercises and adapt the code 
accordingly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17275) Add core training exercises

2020-04-20 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17275:
---

 Summary: Add core training exercises
 Key: FLINK-17275
 URL: https://issues.apache.org/jira/browse/FLINK-17275
 Project: Flink
  Issue Type: New Feature
  Components: Training Excercises
Affects Versions: 1.11.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.10.1, 1.11.0


Port the core training exercises, their descriptions, solutions, and tests from 
https://github.com/ververica/flink-training-exercises to Apache Flink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17171) Blink planner fails to compile Table program with POJO source

2020-04-15 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17171:
---

 Summary: Blink planner fails to compile Table program with POJO 
source
 Key: FLINK-17171
 URL: https://issues.apache.org/jira/browse/FLINK-17171
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner, Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: Nico Kruber
 Attachments: error.log

It seems as if FLINK-13993 made the Table API (Blink planner) unusable for POJO 
sources where the POJO class is in user code.

For 
https://github.com/ververica/lab-sql-vs-datastream/blob/master/src/main/java/com/ververica/LateralTableJoin.java
 I get the following Exception when I run it on a Flink 1.10.0 cluster (full 
version attached):
{code}
2020-04-15 17:19:15,561 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
...
Caused by: org.codehaus.commons.compiler.CompileException: Line 28, Column 175: 
Cannot determine simple type name "com"
...
at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
{code}

I enabled debug logs and this is what it is trying to compile:
{code}
@Override
public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {
  org.apache.flink.table.dataformat.BaseRow in1 = 
(org.apache.flink.table.dataformat.BaseRow) 
(org.apache.flink.table.dataformat.BaseRow) 
converter$15.toInternal((com.ververica.tables.FactTable.Fact) 
element.getValue());
...
{code}

I use a standalone cluster and submit via web UI and also verified that my jar 
file does not contain anything else but its compiled classes.

This code is working fine inside the IDE and was also working with Flink 1.10 
and VVP 2.0 which did not use a dedicated class loader for user code.

My guess is that the (generated) code does not have access to 
{{FactTable.Fact}} and the Janino compiler does not produce the right error 
message seeing "com" as a primitive type instead.

FLINK-7490 and FLINK-9220 seem related but too old (legacy planner).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17143) Blog feed.xml should only contain excerpts, not full contents

2020-04-14 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17143:
---

 Summary: Blog feed.xml should only contain excerpts, not full 
contents
 Key: FLINK-17143
 URL: https://issues.apache.org/jira/browse/FLINK-17143
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Affects Versions: 1.10.0
Reporter: Nico Kruber


The blog's atom 2.0 feed at https://flink.apache.org/blog/feed.xml contains the 
whole content of all blog posts while it should probably only contain the 
excerpts (and links to the full versions) as usual.
This may save some unnecessary web site traffic (bytes) from users using the 
feed to get updates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17057) Add OpenSSL micro-benchmarks

2020-04-08 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17057:
---

 Summary: Add OpenSSL micro-benchmarks
 Key: FLINK-17057
 URL: https://issues.apache.org/jira/browse/FLINK-17057
 Project: Flink
  Issue Type: New Feature
  Components: Benchmarks
Affects Versions: 1.11.0
Reporter: Nico Kruber
Assignee: Nico Kruber


Our JMH micro-benchmarks currently only run with Java's SSL implementation but 
it would also be nice to have them evaluated with OpenSSL.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17056) JMH main() methods call unrelated benchmarks

2020-04-08 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-17056:
---

 Summary: JMH main() methods call unrelated benchmarks
 Key: FLINK-17056
 URL: https://issues.apache.org/jira/browse/FLINK-17056
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.10.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.11.0


Each benchmark class is accompanied by an according {{public static main 
(String[] args)}} method which should run all benchmarks in that class. 
However, it just uses the class' simple name in a regexp like ".*.*" and 
may thus also match further classes that were not intended to run. An example 
for this is the {{StreamNetworkThroughputBenchmarkExecutor}} which also runs 
benchmarks from {{DataSkewStreamNetworkThroughputBenchmarkExecutor}}. Using the 
canonical name instead fixes that behaviour.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16890) Add AvroGeneric benchmark

2020-03-31 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-16890:
---

 Summary: Add AvroGeneric benchmark
 Key: FLINK-16890
 URL: https://issues.apache.org/jira/browse/FLINK-16890
 Project: Flink
  Issue Type: New Feature
  Components: Benchmarks
Reporter: Nico Kruber
Assignee: Nico Kruber


Currently, serialization benchmarks for Avro cover specific records and Avro 
reflect. What is missing is GenericRecord which I propose to add to 
{{SerializationFrameworkAllBenchmarks}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16729) Offer an out-of-the-box Set serializer

2020-03-23 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-16729:
---

 Summary: Offer an out-of-the-box Set serializer
 Key: FLINK-16729
 URL: https://issues.apache.org/jira/browse/FLINK-16729
 Project: Flink
  Issue Type: New Feature
  Components: API / Type Serialization System
Affects Versions: 1.10.0
Reporter: Nico Kruber


Currently, Set types are serialized by Kryo by default, since Flink does not 
come with an own SetSerializer (only one for maps). While the MapSerializer can 
be easily adapted to cover sets instead, I think, this should be available by 
default to get the maximum performance out of Flink (kryo is slow!)

 

When this is added, however, we need to provide a migration path for old state 
(or not use the new SetSerializer by default but offer to opt-in). This may 
need further investigation as to whether it is possible to migrate from kryo 
automatically and whether we can check potential changes to the encapsulated 
entry class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16664) Unable to set DataStreamSource parallelism to default (-1)

2020-03-18 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-16664:
---

 Summary: Unable to set DataStreamSource parallelism to default (-1)
 Key: FLINK-16664
 URL: https://issues.apache.org/jira/browse/FLINK-16664
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.10.1, 1.11.0


A hotfix part of FLINK-14405 actually breaks setting the parallelism to its 
default value for datastream sources, i.e. using value {{-1}}. This is because 
of a small typo: instead of
{code:java}
OperatorValidationUtils.validateParallelism(parallelism, isParallel);  {code}
this is called in 
org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism:
{code:java}
OperatorValidationUtils.validateMaxParallelism(parallelism, isParallel); {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16576) State inconsistency on restore with memory state backends

2020-03-12 Thread Nico Kruber (Jira)
Nico Kruber created FLINK-16576:
---

 Summary: State inconsistency on restore with memory state backends
 Key: FLINK-16576
 URL: https://issues.apache.org/jira/browse/FLINK-16576
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.10.0, 1.9.2
Reporter: Nico Kruber
 Fix For: 1.9.3, 1.10.1, 1.11.0


I occasionally see a few state inconsistencies with the {{TopSpeedWindowing}} 
example in Flink. Restore would fail with either of these causes, but only for 
the memory state backends and only with some combinations of parallelism I took 
the savepoint with and parallelism I restore the job with:
{code:java}
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, 
endKeyGroup=95} does not contain key group 97 {code}
or
{code:java}
java.lang.NullPointerException
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
 {code}
or
{code:java}
java.io.IOException: Corrupt stream, found tag: 8
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
 {code}
 

I managed to make it reproducible in a test that I quickly hacked together in 
[https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java]
 (please checkout the whole repository since I had to change some dependencies).

In a bit more detail, this is what I discovered before, also with a manual 
savepoint on S3:

Savepoint that was taken with parallelism 2 (p=2) and shows the restore failure 
in three different ways (all running in Flink 1.10.0; but I also see it in 
Flink 1.9):
 * first of all, if I try to restore with p=2, everything is fine
 * if I restore with p=4 I get an exception like the one mentioned above:
{code:java}
2020-03-11 15:53:35,149 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, 
PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) 
(2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalArgumentException: KeyGroupRange{startKeyGr

  1   2   3   4   5   >