[jira] [Created] (FLINK-20267) JaasModule prevents Flink from starting if working directory is a symbolic link

2020-11-20 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-20267:
-

 Summary: JaasModule prevents Flink from starting if working 
directory is a symbolic link
 Key: FLINK-20267
 URL: https://issues.apache.org/jira/browse/FLINK-20267
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.12.0
Reporter: Till Rohrmann
 Fix For: 1.12.0


[~AHeise] reported that starting Flink on EMR fails with

{code}
java.lang.RuntimeException: unable to generate a JAAS configuration file
at 
org.apache.flink.runtime.security.modules.JaasModule.generateDefaultConfigFile(JaasModule.java:170)
at 
org.apache.flink.runtime.security.modules.JaasModule.install(JaasModule.java:94)
at 
org.apache.flink.runtime.security.SecurityUtils.installModules(SecurityUtils.java:78)
at 
org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:59)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1045)
Caused by: java.nio.file.FileAlreadyExistsException: /tmp
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
at java.nio.file.Files.createDirectory(Files.java:674)
at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
at java.nio.file.Files.createDirectories(Files.java:727)
at 
org.apache.flink.runtime.security.modules.JaasModule.generateDefaultConfigFile(JaasModule.java:162)
... 4 more
{code}

The problem is that on EMR {{/tmp}} is a symbolic link. Due to FLINK-19252 
where we introduced the [creation of the working 
directory|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java#L162]
 in order to create the default Jaas config file, the start up process fails if 
the path for the working directory is not a directory (apparently 
{{Files.createDirectories}} cannot deal with symbolic links).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20266) New FileSource prevents IntelliJ from stopping spawned JVM when running a job

2020-11-20 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-20266:
-

 Summary: New FileSource prevents IntelliJ from stopping spawned 
JVM when running a job
 Key: FLINK-20266
 URL: https://issues.apache.org/jira/browse/FLINK-20266
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.12.0
Reporter: Till Rohrmann
 Fix For: 1.12.0


When trying out the new {{FileSource}} I noticed that the jobs which I started 
from my IDE won't properly terminate. To be more precise the spawned JVM for 
the jobs wouldn't properly terminate. I cannot really tell what the 
{{FileSource}} does differently, but when not using this source, the JVM 
terminates properly.

The stack trace of the hanging JVM is

{code}
2020-11-20 18:20:02
Full thread dump OpenJDK 64-Bit Server VM (11.0.2+9 mixed mode):

Threads class SMR info:
_java_thread_list=0x7fb5bc15f1b0, length=19, elements={
0x7fb60d807000, 0x7fb60d80c000, 0x7fb60d80f000, 0x7fb60d809000,
0x7fb60d81a000, 0x7fb61f00b000, 0x7fb63d80e000, 0x7fb61d826800,
0x7fb61d829800, 0x7fb61e80, 0x7fb63d95d800, 0x7fb63e2f8800,
0x7fb5ba37a800, 0x7fb5afe1a800, 0x7fb61dff6800, 0x7fb63da49800,
0x7fb63e8d0800, 0x7fb5be001000, 0x7fb5bb8a4000
}

"Reference Handler" #2 daemon prio=10 os_prio=31 cpu=10.05ms elapsed=86.35s 
tid=0x7fb60d807000 nid=0x4b03 waiting on condition  [0x736e9000]
   java.lang.Thread.State: RUNNABLE
at 
java.lang.ref.Reference.waitForReferencePendingList(java.base@11.0.2/Native 
Method)
at 
java.lang.ref.Reference.processPendingReferences(java.base@11.0.2/Reference.java:241)
at 
java.lang.ref.Reference$ReferenceHandler.run(java.base@11.0.2/Reference.java:213)

"Finalizer" #3 daemon prio=8 os_prio=31 cpu=0.90ms elapsed=86.35s 
tid=0x7fb60d80c000 nid=0x3803 in Object.wait()  [0x737ec000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.2/Native Method)
- waiting on <0x000600204780> (a java.lang.ref.ReferenceQueue$Lock)
at 
java.lang.ref.ReferenceQueue.remove(java.base@11.0.2/ReferenceQueue.java:155)
- waiting to re-lock in wait() <0x000600204780> (a 
java.lang.ref.ReferenceQueue$Lock)
at 
java.lang.ref.ReferenceQueue.remove(java.base@11.0.2/ReferenceQueue.java:176)
at 
java.lang.ref.Finalizer$FinalizerThread.run(java.base@11.0.2/Finalizer.java:170)

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 cpu=0.31ms elapsed=86.34s 
tid=0x7fb60d80f000 nid=0x4203 runnable  [0x]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 cpu=2479.36ms elapsed=86.34s 
tid=0x7fb60d809000 nid=0x3f03 waiting on condition  [0x]
   java.lang.Thread.State: RUNNABLE
   No compile task

"C1 CompilerThread0" #8 daemon prio=9 os_prio=31 cpu=1412.88ms elapsed=86.34s 
tid=0x7fb60d81a000 nid=0x3d03 waiting on condition  [0x]
   java.lang.Thread.State: RUNNABLE
   No compile task

"Sweeper thread" #9 daemon prio=9 os_prio=31 cpu=42.82ms elapsed=86.34s 
tid=0x7fb61f00b000 nid=0xa803 runnable  [0x]
   java.lang.Thread.State: RUNNABLE

"Common-Cleaner" #10 daemon prio=8 os_prio=31 cpu=3.25ms elapsed=86.29s 
tid=0x7fb63d80e000 nid=0x5703 in Object.wait()  [0x73cfb000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.2/Native Method)
- waiting on <0x000600205aa0> (a java.lang.ref.ReferenceQueue$Lock)
at 
java.lang.ref.ReferenceQueue.remove(java.base@11.0.2/ReferenceQueue.java:155)
- waiting to re-lock in wait() <0x000600205aa0> (a 
java.lang.ref.ReferenceQueue$Lock)
at 
jdk.internal.ref.CleanerImpl.run(java.base@11.0.2/CleanerImpl.java:148)
at java.lang.Thread.run(java.base@11.0.2/Thread.java:834)
at 
jdk.internal.misc.InnocuousThread.run(java.base@11.0.2/InnocuousThread.java:134)

"JDWP Transport Listener: dt_socket" #11 daemon prio=10 os_prio=31 cpu=43.46ms 
elapsed=86.27s tid=0x7fb61d826800 nid=0xa603 runnable  [0x]
   java.lang.Thread.State: RUNNABLE

"JDWP Event Helper Thread" #12 daemon prio=10 os_prio=31 cpu=220.06ms 
elapsed=86.27s tid=0x7fb61d829800 nid=0x5e03 runnable  [0x]
   java.lang.Thread.State: RUNNABLE

"JDWP Command Reader" #13 daemon prio=10 os_prio=31 cpu=27.26ms elapsed=86.27s 
tid=0x7fb61e80 nid=0x6103 runnable  [0x]
   java.lang.Thread.State: RUNNABLE

"Service Thread" #14 daemon prio=9 os_prio=31 cpu=0.06ms elapsed=86.19s 
tid=0x7fb63d95d800 nid=0xa203 runnable  [0x]
   java.lang.Thread.State: RUNNABLE

"ForkJoinPool.commonPool-worker-19" #25 daemon prio=1 os_prio=31 cpu=2.00ms 
elapsed=84.76s tid=0x7fb

[jira] [Created] (FLINK-20265) Extend invocation protocol to allow functions to indicate incomplete state context

2020-11-20 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-20265:
---

 Summary: Extend invocation protocol to allow functions to indicate 
incomplete state context
 Key: FLINK-20265
 URL: https://issues.apache.org/jira/browse/FLINK-20265
 Project: Flink
  Issue Type: Sub-task
  Components: Stateful Functions
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: statefun-2.3.0


Currently, users declare the states a function will access with a module YAML 
definition file. The modules are loaded once when starting a StateFun cluster, 
meaning that the state specifications remain static throughout the cluster's 
execution lifetime.

We propose that state specifications should be declared by the function 
themselves via the language SDKs, instead of being declared in the module YAMLs.

The state specifications, now living in the functions, can be made discoverable 
by the StateFun runtime through the invocation request-reply protocol.

Brief simplified sketch of the extended protocol:
- StateFun dispatches an invocation request, with states {A, B}.

- Function receives request, but since it requires {A, B, C, D}, it responds 
with a IncompleteInvocationContext response indicating that state values for C, 
D is missing

- StateFun receives this response, and registers new Flink state handles for 
{C, D}.

- Then, a new invocation request with the same input messages, but "patched" 
with new states to contain all values for {A, B, C, D} is resent to the 
function.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20264) Zero-downtime / dynamic function upgrades in Stateful Functions

2020-11-20 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-20264:
---

 Summary: Zero-downtime / dynamic function upgrades in Stateful 
Functions
 Key: FLINK-20264
 URL: https://issues.apache.org/jira/browse/FLINK-20264
 Project: Flink
  Issue Type: Task
  Components: Stateful Functions
Reporter: Tzu-Li (Gordon) Tai
 Fix For: statefun-2.3.0


Currently, due to how functions can be executed as stateless deployments 
separate to the StateFun runtime, they can be easily upgraded with 
zero-downtime.

However, up to now there are still some restrictions to what can be done 
without restarting StateFun processes:

* Can't upgrade existing functions to declare new persisted state
* Can't add new functions to an existing StateFun application, and have 
messages routed to it

The end goal of this epic is to enable the above operations for function 
deployments, without the need to restart the StateFun runtime. Further details 
can be found in subtasks of this JIRA.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20263) Improve exception when metada name mismath

2020-11-20 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-20263:


 Summary: Improve exception when metada name mismath
 Key: FLINK-20263
 URL: https://issues.apache.org/jira/browse/FLINK-20263
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: Dawid Wysakowicz
 Fix For: 1.12.0


I'd suggest to slightly improve the exception message when there is a mismatch 
in the field name. It would be nice to provide with an example of a valid 
syntax.

Right now we get:
{code}
org.apache.flink.table.api.ValidationException: Invalid metadata key 'tstmp' in 
column 'tstmp' of table 
'default_catalog.default_database.pageviews_per_region'. The DynamicTableSink 
class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink' 
supports the following metadata keys for writing:
headers
timestamp
{code}

would be nice to have something like:

{code}
org.apache.flink.table.api.ValidationException: Invalid metadata key 'tstmp' in 
column 'tstmp' of table 
'default_catalog.default_database.pageviews_per_region'. The DynamicTableSink 
class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink' 
supports the following metadata keys for writing:
headers
timestamp

Example:
tstmp TIMESTAMP(3) METADATA FROM 'timestamp'
{code}

This would let users easier figure out the error in the syntax.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20262) Building flink-dist docker image does not work without python2

2020-11-20 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-20262:


 Summary: Building flink-dist docker image does not work without 
python2
 Key: FLINK-20262
 URL: https://issues.apache.org/jira/browse/FLINK-20262
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.11.2, 1.12.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.12.0, 1.11.4


The script {{common_docker.sh}} in function {{start_file_server}} tests 
existence of {{python3}}, but executes command using {{python}}:

{code}
command -v python3 >/dev/null 2>&1
if [[ $? -eq 0 ]]; then
  python ${TEST_INFRA_DIR}/python3_fileserver.py &
  return
fi
{code}

The script {{python3_fileserver.py}} uses python2 {{SocketServer}} which does 
not exist in python3. It should use {{socketserver}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Apache Flink 1.12.0, release candidate #1

2020-11-20 Thread Robert Metzger
Thanks a lot for creating the first release candidate Dian!

It is quite difficult to keep track of all the testing / fixing activities,
I'll try to provide a short summary:

The most critical bugs found so far are:
1. Iterative Batch jobs are deadlocking:
https://issues.apache.org/jira/browse/FLINK-19964
2. Unaligned checkpoints are unstable:
https://issues.apache.org/jira/browse/FLINK-20145
3. New Kafka Source is not working:
https://issues.apache.org/jira/browse/FLINK-20157

1. is fixed, 2. is almost merged and 3 has some open PRs already.

There are still some other bugs that would be nice to address, but I would
like to create a new release candidate early next week (ideally Monday),
because some critical issues are showing up multiple times in the testing
now.
The testing seems to make good progress (as tracked in
https://issues.apache.org/jira/browse/FLINK-20112), some testing tasks have
finished already, others have early results.






On Mon, Nov 9, 2020 at 3:25 PM Dian Fu  wrote:

> Hi all,
>
> The RC1 for Apache Flink 1.12.0 has been created. This is still a
> preview-only release candidate to drive the current testing efforts and so
> no official votes will take place. It has all the artifacts that we would
> typically have for a release, except for the release note and the website
> pull request for the release announcement.
>
> It includes the following:
>* the preview source release and binary convenience releases [1], which
> are signed with the key with fingerprint
> 6B6291A8502BA8F0913AE04DDEB95B05BF075300 [2],
>* all artifacts that would normally be deployed to the Maven Central
> Repository [3]
>* source code tag "release-1.12.0-rc1" [4]
>
> To test with these artifacts, you can create a settings.xml file with the
> content shown below [5]. This settings file can be referenced in your maven
> commands via --settings /path/to/settings.xml. This is useful for creating
> a quickstart project based on the staged release and also for building
> against the staged jars.
>
> Happy testing!
>
> Regards,
> Robert & Dian
>
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> [3]
> https://repository.apache.org/content/repositories/orgapacheflink-1402/
> [4] https://github.com/apache/flink/releases/tag/release-1.12.0-rc1
> [5]
> 
> 
>  flink-1.12.0
> 
> 
> 
> flink-1.12.0
> 
>   
> flink-1.12.0
> 
> https://repository.apache.org/content/repositories/orgapacheflink-1402/
> 
>  
>  
>archetype
>
> https://repository.apache.org/content/repositories/orgapacheflink-1402/
> 
>  
>  
> 
> 
> 


[jira] [Created] (FLINK-20261) Uncaught exception in ExecutorNotifier due to split assignment broken by failed task

2020-11-20 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-20261:
---

 Summary: Uncaught exception in ExecutorNotifier due to split 
assignment broken by failed task
 Key: FLINK-20261
 URL: https://issues.apache.org/jira/browse/FLINK-20261
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.12.0
Reporter: Andrey Zagrebin


While trying to extend FileSourceTextLinesITCase::testContinuousTextFileSource 
with recovery test after TM failure (TestingMiniCluster::terminateTaskExecutor, 
[branch|https://github.com/azagrebin/flink/tree/FLINK-20118-it]), I encountered 
the following case:
* SourceCoordinatorContext::assignSplits schedules async assignment (all reader 
tasks alive)
* call TestingMiniCluster::terminateTaskExecutor while doing writeFile in a 
loop of testContinuousTextFileSource
* causes graceful TaskExecutor::onStop shutdown
* causes TM/RM disconnect and failing slot allocations in JM by RM
* eventually causes SourceCoordinatorContext::unregisterSourceReader
* actual assignment starts (SourceCoordinatorContext::assignSplits: 
callInCoordinatorThread)
* registeredReaders.containsKey(subtaskId) check fails with 
IllegalArgumentException which is uncaught in single thread executor
* forces ThreadPool to recreate the single thread
* calls CoordinatorExecutorThreadFactory::newThread
* fails expected condition of single thread creation with IllegalStateException 
which is uncaught
* calls FatalExitExceptionHandler and exits JVM abruptly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20260) YAML for SQL client only supports legacy interfaces

2020-11-20 Thread Jira
Ingo Bürk created FLINK-20260:
-

 Summary: YAML for SQL client only supports legacy interfaces
 Key: FLINK-20260
 URL: https://issues.apache.org/jira/browse/FLINK-20260
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.11.2
Reporter: Ingo Bürk


In the YAML configuration for the SQL client users can define tables. However, 
when this YAML is parsed, only the legacy interfaces (TableSourceFactory) is 
supported, and using connectors implementing e.g. DynamicTableSourceFactory 
will instead result in an error on startup of the client.

If defining tables in the YAML is to be continued to be supported, it should 
also be made to support the newer interfaces (or clearly documented that this 
is not the case).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20259) Add explanation that "configured values" for JM/TM memory sizes include automatically derived values

2020-11-20 Thread Xintong Song (Jira)
Xintong Song created FLINK-20259:


 Summary: Add explanation that "configured values" for JM/TM memory 
sizes include automatically derived values
 Key: FLINK-20259
 URL: https://issues.apache.org/jira/browse/FLINK-20259
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Xintong Song
 Fix For: 1.12.0


The column title `Configured Values` might be a bit confusing. Not all of the 
values are explicitly configured by users. There could be values that are 
automatically derived from users' configuration. I would suggest to add a bit 
explanation (e.g., a (i) with some hidden texts) for both JM and TM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20258) Configured memory sizes on the JM metrics page should be displayed with proper units.

2020-11-20 Thread Xintong Song (Jira)
Xintong Song created FLINK-20258:


 Summary: Configured memory sizes on the JM metrics page should be 
displayed with proper units.
 Key: FLINK-20258
 URL: https://issues.apache.org/jira/browse/FLINK-20258
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Xintong Song
 Fix For: 1.12.0


Configured memory sizes for JM are displayed in bytes. It would be better to 
use proper units here, same as what we do for TM.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20257) Flink web-ui metrics page add metrics select Can't see the full name of the metics

2020-11-20 Thread meiminda (Jira)
meiminda created FLINK-20257:


 Summary: Flink web-ui metrics page add metrics select  Can't see 
the full name of the metics
 Key: FLINK-20257
 URL: https://issues.apache.org/jira/browse/FLINK-20257
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.11.2
Reporter: meiminda
 Attachments: image-2020-11-20-16-56-26-496.png, 
image-2020-11-20-17-01-16-128.png

Flink web-ui metrics page add metrics selector can't see the full name of the 
metics, so when I want to find a long metric ,I have to try one by one.

!image-2020-11-20-16-56-26-496.png!

At least to achieve an effect as shown below

!image-2020-11-20-17-01-16-128.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20256) UDAF type inference will fail if accumulator contains MapView with Pojo value type

2020-11-20 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-20256:
---

 Summary: UDAF type inference will fail if accumulator contains 
MapView with Pojo value type
 Key: FLINK-20256
 URL: https://issues.apache.org/jira/browse/FLINK-20256
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Caizhi Weng
 Fix For: 1.12.0


To reproduce this bug, add the following test to {{FunctionITCase.java}}.

{code:java}
public static class MyPojo implements Serializable {
public String a;
public int b;

public MyPojo(String s) {
this.a = s;
this.b = s.length();
}
}

public static class MyAcc implements Serializable {

public MapView view = new MapView<>();

public MyAcc() {}

public void add(String a, String b) {
try {
view.put(a, new MyPojo(b));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

public static class TestUDAF extends AggregateFunction {

@Override
public MyAcc createAccumulator() {
return new MyAcc();
}

public void accumulate(MyAcc acc, String value) {
if (value != null) {
acc.add(value, value);
}
}

@Override
public String getValue(MyAcc acc) {
return "test";
}
}

@Test
public void myTest() throws Exception {
String ddl = "create function MyACC as '" + TestUDAF.class.getName() + 
"'";
tEnv().executeSql(ddl).await();
try (CloseableIterator it = tEnv().executeSql("SELECT 
MyACC('123')").collect()) {
while (it.hasNext()) {
System.out.println(it.next());
}
}
}
{code}

And we'll get the following exception stack
{code}
java.lang.ClassCastException: org.apache.flink.table.types.AtomicDataType 
cannot be cast to org.apache.flink.table.types.KeyValueDataType

at 
org.apache.flink.table.planner.typeutils.DataViewUtils$MapViewSpec.getKeyDataType(DataViewUtils.java:257)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1$$anonfun$22.apply(AggsHandlerCodeGenerator.scala:1231)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1$$anonfun$22.apply(AggsHandlerCodeGenerator.scala:1231)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$.org$apache$flink$table$planner$codegen$agg$AggsHandlerCodeGenerator$$addReusableDataViewSerializer(AggsHandlerCodeGenerator.scala:1294)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1.apply(AggsHandlerCodeGenerator.scala:1228)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1.apply(AggsHandlerCodeGenerator.scala:1211)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$.addReusableStateDataViews(AggsHandlerCodeGenerator.scala:1211)
at 
org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.(ImperativeAggCodeGen.scala:112)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:233)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:214)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.initialAggregateInformation(AggsHandlerCodeGenerator.scala:214)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:325)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:143)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:52)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNo