[jira] [Created] (FLINK-20267) JaasModule prevents Flink from starting if working directory is a symbolic link
Till Rohrmann created FLINK-20267: - Summary: JaasModule prevents Flink from starting if working directory is a symbolic link Key: FLINK-20267 URL: https://issues.apache.org/jira/browse/FLINK-20267 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.12.0 Reporter: Till Rohrmann Fix For: 1.12.0 [~AHeise] reported that starting Flink on EMR fails with {code} java.lang.RuntimeException: unable to generate a JAAS configuration file at org.apache.flink.runtime.security.modules.JaasModule.generateDefaultConfigFile(JaasModule.java:170) at org.apache.flink.runtime.security.modules.JaasModule.install(JaasModule.java:94) at org.apache.flink.runtime.security.SecurityUtils.installModules(SecurityUtils.java:78) at org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:59) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1045) Caused by: java.nio.file.FileAlreadyExistsException: /tmp at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) at java.nio.file.Files.createDirectory(Files.java:674) at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) at java.nio.file.Files.createDirectories(Files.java:727) at org.apache.flink.runtime.security.modules.JaasModule.generateDefaultConfigFile(JaasModule.java:162) ... 4 more {code} The problem is that on EMR {{/tmp}} is a symbolic link. Due to FLINK-19252 where we introduced the [creation of the working directory|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java#L162] in order to create the default Jaas config file, the start up process fails if the path for the working directory is not a directory (apparently {{Files.createDirectories}} cannot deal with symbolic links). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20266) New FileSource prevents IntelliJ from stopping spawned JVM when running a job
Till Rohrmann created FLINK-20266: - Summary: New FileSource prevents IntelliJ from stopping spawned JVM when running a job Key: FLINK-20266 URL: https://issues.apache.org/jira/browse/FLINK-20266 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.12.0 Reporter: Till Rohrmann Fix For: 1.12.0 When trying out the new {{FileSource}} I noticed that the jobs which I started from my IDE won't properly terminate. To be more precise the spawned JVM for the jobs wouldn't properly terminate. I cannot really tell what the {{FileSource}} does differently, but when not using this source, the JVM terminates properly. The stack trace of the hanging JVM is {code} 2020-11-20 18:20:02 Full thread dump OpenJDK 64-Bit Server VM (11.0.2+9 mixed mode): Threads class SMR info: _java_thread_list=0x7fb5bc15f1b0, length=19, elements={ 0x7fb60d807000, 0x7fb60d80c000, 0x7fb60d80f000, 0x7fb60d809000, 0x7fb60d81a000, 0x7fb61f00b000, 0x7fb63d80e000, 0x7fb61d826800, 0x7fb61d829800, 0x7fb61e80, 0x7fb63d95d800, 0x7fb63e2f8800, 0x7fb5ba37a800, 0x7fb5afe1a800, 0x7fb61dff6800, 0x7fb63da49800, 0x7fb63e8d0800, 0x7fb5be001000, 0x7fb5bb8a4000 } "Reference Handler" #2 daemon prio=10 os_prio=31 cpu=10.05ms elapsed=86.35s tid=0x7fb60d807000 nid=0x4b03 waiting on condition [0x736e9000] java.lang.Thread.State: RUNNABLE at java.lang.ref.Reference.waitForReferencePendingList(java.base@11.0.2/Native Method) at java.lang.ref.Reference.processPendingReferences(java.base@11.0.2/Reference.java:241) at java.lang.ref.Reference$ReferenceHandler.run(java.base@11.0.2/Reference.java:213) "Finalizer" #3 daemon prio=8 os_prio=31 cpu=0.90ms elapsed=86.35s tid=0x7fb60d80c000 nid=0x3803 in Object.wait() [0x737ec000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.2/Native Method) - waiting on <0x000600204780> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(java.base@11.0.2/ReferenceQueue.java:155) - waiting to re-lock in wait() <0x000600204780> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(java.base@11.0.2/ReferenceQueue.java:176) at java.lang.ref.Finalizer$FinalizerThread.run(java.base@11.0.2/Finalizer.java:170) "Signal Dispatcher" #4 daemon prio=9 os_prio=31 cpu=0.31ms elapsed=86.34s tid=0x7fb60d80f000 nid=0x4203 runnable [0x] java.lang.Thread.State: RUNNABLE "C2 CompilerThread0" #5 daemon prio=9 os_prio=31 cpu=2479.36ms elapsed=86.34s tid=0x7fb60d809000 nid=0x3f03 waiting on condition [0x] java.lang.Thread.State: RUNNABLE No compile task "C1 CompilerThread0" #8 daemon prio=9 os_prio=31 cpu=1412.88ms elapsed=86.34s tid=0x7fb60d81a000 nid=0x3d03 waiting on condition [0x] java.lang.Thread.State: RUNNABLE No compile task "Sweeper thread" #9 daemon prio=9 os_prio=31 cpu=42.82ms elapsed=86.34s tid=0x7fb61f00b000 nid=0xa803 runnable [0x] java.lang.Thread.State: RUNNABLE "Common-Cleaner" #10 daemon prio=8 os_prio=31 cpu=3.25ms elapsed=86.29s tid=0x7fb63d80e000 nid=0x5703 in Object.wait() [0x73cfb000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.2/Native Method) - waiting on <0x000600205aa0> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(java.base@11.0.2/ReferenceQueue.java:155) - waiting to re-lock in wait() <0x000600205aa0> (a java.lang.ref.ReferenceQueue$Lock) at jdk.internal.ref.CleanerImpl.run(java.base@11.0.2/CleanerImpl.java:148) at java.lang.Thread.run(java.base@11.0.2/Thread.java:834) at jdk.internal.misc.InnocuousThread.run(java.base@11.0.2/InnocuousThread.java:134) "JDWP Transport Listener: dt_socket" #11 daemon prio=10 os_prio=31 cpu=43.46ms elapsed=86.27s tid=0x7fb61d826800 nid=0xa603 runnable [0x] java.lang.Thread.State: RUNNABLE "JDWP Event Helper Thread" #12 daemon prio=10 os_prio=31 cpu=220.06ms elapsed=86.27s tid=0x7fb61d829800 nid=0x5e03 runnable [0x] java.lang.Thread.State: RUNNABLE "JDWP Command Reader" #13 daemon prio=10 os_prio=31 cpu=27.26ms elapsed=86.27s tid=0x7fb61e80 nid=0x6103 runnable [0x] java.lang.Thread.State: RUNNABLE "Service Thread" #14 daemon prio=9 os_prio=31 cpu=0.06ms elapsed=86.19s tid=0x7fb63d95d800 nid=0xa203 runnable [0x] java.lang.Thread.State: RUNNABLE "ForkJoinPool.commonPool-worker-19" #25 daemon prio=1 os_prio=31 cpu=2.00ms elapsed=84.76s tid=0x7fb
[jira] [Created] (FLINK-20265) Extend invocation protocol to allow functions to indicate incomplete state context
Tzu-Li (Gordon) Tai created FLINK-20265: --- Summary: Extend invocation protocol to allow functions to indicate incomplete state context Key: FLINK-20265 URL: https://issues.apache.org/jira/browse/FLINK-20265 Project: Flink Issue Type: Sub-task Components: Stateful Functions Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: statefun-2.3.0 Currently, users declare the states a function will access with a module YAML definition file. The modules are loaded once when starting a StateFun cluster, meaning that the state specifications remain static throughout the cluster's execution lifetime. We propose that state specifications should be declared by the function themselves via the language SDKs, instead of being declared in the module YAMLs. The state specifications, now living in the functions, can be made discoverable by the StateFun runtime through the invocation request-reply protocol. Brief simplified sketch of the extended protocol: - StateFun dispatches an invocation request, with states {A, B}. - Function receives request, but since it requires {A, B, C, D}, it responds with a IncompleteInvocationContext response indicating that state values for C, D is missing - StateFun receives this response, and registers new Flink state handles for {C, D}. - Then, a new invocation request with the same input messages, but "patched" with new states to contain all values for {A, B, C, D} is resent to the function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20264) Zero-downtime / dynamic function upgrades in Stateful Functions
Tzu-Li (Gordon) Tai created FLINK-20264: --- Summary: Zero-downtime / dynamic function upgrades in Stateful Functions Key: FLINK-20264 URL: https://issues.apache.org/jira/browse/FLINK-20264 Project: Flink Issue Type: Task Components: Stateful Functions Reporter: Tzu-Li (Gordon) Tai Fix For: statefun-2.3.0 Currently, due to how functions can be executed as stateless deployments separate to the StateFun runtime, they can be easily upgraded with zero-downtime. However, up to now there are still some restrictions to what can be done without restarting StateFun processes: * Can't upgrade existing functions to declare new persisted state * Can't add new functions to an existing StateFun application, and have messages routed to it The end goal of this epic is to enable the above operations for function deployments, without the need to restart the StateFun runtime. Further details can be found in subtasks of this JIRA. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20263) Improve exception when metada name mismath
Dawid Wysakowicz created FLINK-20263: Summary: Improve exception when metada name mismath Key: FLINK-20263 URL: https://issues.apache.org/jira/browse/FLINK-20263 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.0 Reporter: Dawid Wysakowicz Fix For: 1.12.0 I'd suggest to slightly improve the exception message when there is a mismatch in the field name. It would be nice to provide with an example of a valid syntax. Right now we get: {code} org.apache.flink.table.api.ValidationException: Invalid metadata key 'tstmp' in column 'tstmp' of table 'default_catalog.default_database.pageviews_per_region'. The DynamicTableSink class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink' supports the following metadata keys for writing: headers timestamp {code} would be nice to have something like: {code} org.apache.flink.table.api.ValidationException: Invalid metadata key 'tstmp' in column 'tstmp' of table 'default_catalog.default_database.pageviews_per_region'. The DynamicTableSink class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink' supports the following metadata keys for writing: headers timestamp Example: tstmp TIMESTAMP(3) METADATA FROM 'timestamp' {code} This would let users easier figure out the error in the syntax. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20262) Building flink-dist docker image does not work without python2
Dawid Wysakowicz created FLINK-20262: Summary: Building flink-dist docker image does not work without python2 Key: FLINK-20262 URL: https://issues.apache.org/jira/browse/FLINK-20262 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.11.2, 1.12.0 Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Fix For: 1.12.0, 1.11.4 The script {{common_docker.sh}} in function {{start_file_server}} tests existence of {{python3}}, but executes command using {{python}}: {code} command -v python3 >/dev/null 2>&1 if [[ $? -eq 0 ]]; then python ${TEST_INFRA_DIR}/python3_fileserver.py & return fi {code} The script {{python3_fileserver.py}} uses python2 {{SocketServer}} which does not exist in python3. It should use {{socketserver}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [ANNOUNCE] Apache Flink 1.12.0, release candidate #1
Thanks a lot for creating the first release candidate Dian! It is quite difficult to keep track of all the testing / fixing activities, I'll try to provide a short summary: The most critical bugs found so far are: 1. Iterative Batch jobs are deadlocking: https://issues.apache.org/jira/browse/FLINK-19964 2. Unaligned checkpoints are unstable: https://issues.apache.org/jira/browse/FLINK-20145 3. New Kafka Source is not working: https://issues.apache.org/jira/browse/FLINK-20157 1. is fixed, 2. is almost merged and 3 has some open PRs already. There are still some other bugs that would be nice to address, but I would like to create a new release candidate early next week (ideally Monday), because some critical issues are showing up multiple times in the testing now. The testing seems to make good progress (as tracked in https://issues.apache.org/jira/browse/FLINK-20112), some testing tasks have finished already, others have early results. On Mon, Nov 9, 2020 at 3:25 PM Dian Fu wrote: > Hi all, > > The RC1 for Apache Flink 1.12.0 has been created. This is still a > preview-only release candidate to drive the current testing efforts and so > no official votes will take place. It has all the artifacts that we would > typically have for a release, except for the release note and the website > pull request for the release announcement. > > It includes the following: >* the preview source release and binary convenience releases [1], which > are signed with the key with fingerprint > 6B6291A8502BA8F0913AE04DDEB95B05BF075300 [2], >* all artifacts that would normally be deployed to the Maven Central > Repository [3] >* source code tag "release-1.12.0-rc1" [4] > > To test with these artifacts, you can create a settings.xml file with the > content shown below [5]. This settings file can be referenced in your maven > commands via --settings /path/to/settings.xml. This is useful for creating > a quickstart project based on the staged release and also for building > against the staged jars. > > Happy testing! > > Regards, > Robert & Dian > > [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/ > [2] https://dist.apache.org/repos/dist/release/flink/KEYS > [3] > https://repository.apache.org/content/repositories/orgapacheflink-1402/ > [4] https://github.com/apache/flink/releases/tag/release-1.12.0-rc1 > [5] > > > flink-1.12.0 > > > > flink-1.12.0 > > > flink-1.12.0 > > https://repository.apache.org/content/repositories/orgapacheflink-1402/ > > > >archetype > > https://repository.apache.org/content/repositories/orgapacheflink-1402/ > > > > > >
[jira] [Created] (FLINK-20261) Uncaught exception in ExecutorNotifier due to split assignment broken by failed task
Andrey Zagrebin created FLINK-20261: --- Summary: Uncaught exception in ExecutorNotifier due to split assignment broken by failed task Key: FLINK-20261 URL: https://issues.apache.org/jira/browse/FLINK-20261 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.12.0 Reporter: Andrey Zagrebin While trying to extend FileSourceTextLinesITCase::testContinuousTextFileSource with recovery test after TM failure (TestingMiniCluster::terminateTaskExecutor, [branch|https://github.com/azagrebin/flink/tree/FLINK-20118-it]), I encountered the following case: * SourceCoordinatorContext::assignSplits schedules async assignment (all reader tasks alive) * call TestingMiniCluster::terminateTaskExecutor while doing writeFile in a loop of testContinuousTextFileSource * causes graceful TaskExecutor::onStop shutdown * causes TM/RM disconnect and failing slot allocations in JM by RM * eventually causes SourceCoordinatorContext::unregisterSourceReader * actual assignment starts (SourceCoordinatorContext::assignSplits: callInCoordinatorThread) * registeredReaders.containsKey(subtaskId) check fails with IllegalArgumentException which is uncaught in single thread executor * forces ThreadPool to recreate the single thread * calls CoordinatorExecutorThreadFactory::newThread * fails expected condition of single thread creation with IllegalStateException which is uncaught * calls FatalExitExceptionHandler and exits JVM abruptly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20260) YAML for SQL client only supports legacy interfaces
Ingo Bürk created FLINK-20260: - Summary: YAML for SQL client only supports legacy interfaces Key: FLINK-20260 URL: https://issues.apache.org/jira/browse/FLINK-20260 Project: Flink Issue Type: Improvement Affects Versions: 1.11.2 Reporter: Ingo Bürk In the YAML configuration for the SQL client users can define tables. However, when this YAML is parsed, only the legacy interfaces (TableSourceFactory) is supported, and using connectors implementing e.g. DynamicTableSourceFactory will instead result in an error on startup of the client. If defining tables in the YAML is to be continued to be supported, it should also be made to support the newer interfaces (or clearly documented that this is not the case). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20259) Add explanation that "configured values" for JM/TM memory sizes include automatically derived values
Xintong Song created FLINK-20259: Summary: Add explanation that "configured values" for JM/TM memory sizes include automatically derived values Key: FLINK-20259 URL: https://issues.apache.org/jira/browse/FLINK-20259 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: Xintong Song Fix For: 1.12.0 The column title `Configured Values` might be a bit confusing. Not all of the values are explicitly configured by users. There could be values that are automatically derived from users' configuration. I would suggest to add a bit explanation (e.g., a (i) with some hidden texts) for both JM and TM. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20258) Configured memory sizes on the JM metrics page should be displayed with proper units.
Xintong Song created FLINK-20258: Summary: Configured memory sizes on the JM metrics page should be displayed with proper units. Key: FLINK-20258 URL: https://issues.apache.org/jira/browse/FLINK-20258 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Reporter: Xintong Song Fix For: 1.12.0 Configured memory sizes for JM are displayed in bytes. It would be better to use proper units here, same as what we do for TM. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20257) Flink web-ui metrics page add metrics select Can't see the full name of the metics
meiminda created FLINK-20257: Summary: Flink web-ui metrics page add metrics select Can't see the full name of the metics Key: FLINK-20257 URL: https://issues.apache.org/jira/browse/FLINK-20257 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Affects Versions: 1.11.2 Reporter: meiminda Attachments: image-2020-11-20-16-56-26-496.png, image-2020-11-20-17-01-16-128.png Flink web-ui metrics page add metrics selector can't see the full name of the metics, so when I want to find a long metric ,I have to try one by one. !image-2020-11-20-16-56-26-496.png! At least to achieve an effect as shown below !image-2020-11-20-17-01-16-128.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20256) UDAF type inference will fail if accumulator contains MapView with Pojo value type
Caizhi Weng created FLINK-20256: --- Summary: UDAF type inference will fail if accumulator contains MapView with Pojo value type Key: FLINK-20256 URL: https://issues.apache.org/jira/browse/FLINK-20256 Project: Flink Issue Type: Bug Components: Table SQL / API Reporter: Caizhi Weng Fix For: 1.12.0 To reproduce this bug, add the following test to {{FunctionITCase.java}}. {code:java} public static class MyPojo implements Serializable { public String a; public int b; public MyPojo(String s) { this.a = s; this.b = s.length(); } } public static class MyAcc implements Serializable { public MapView view = new MapView<>(); public MyAcc() {} public void add(String a, String b) { try { view.put(a, new MyPojo(b)); } catch (Exception e) { throw new RuntimeException(e); } } } public static class TestUDAF extends AggregateFunction { @Override public MyAcc createAccumulator() { return new MyAcc(); } public void accumulate(MyAcc acc, String value) { if (value != null) { acc.add(value, value); } } @Override public String getValue(MyAcc acc) { return "test"; } } @Test public void myTest() throws Exception { String ddl = "create function MyACC as '" + TestUDAF.class.getName() + "'"; tEnv().executeSql(ddl).await(); try (CloseableIterator it = tEnv().executeSql("SELECT MyACC('123')").collect()) { while (it.hasNext()) { System.out.println(it.next()); } } } {code} And we'll get the following exception stack {code} java.lang.ClassCastException: org.apache.flink.table.types.AtomicDataType cannot be cast to org.apache.flink.table.types.KeyValueDataType at org.apache.flink.table.planner.typeutils.DataViewUtils$MapViewSpec.getKeyDataType(DataViewUtils.java:257) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1$$anonfun$22.apply(AggsHandlerCodeGenerator.scala:1231) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1$$anonfun$22.apply(AggsHandlerCodeGenerator.scala:1231) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$.org$apache$flink$table$planner$codegen$agg$AggsHandlerCodeGenerator$$addReusableDataViewSerializer(AggsHandlerCodeGenerator.scala:1294) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1.apply(AggsHandlerCodeGenerator.scala:1228) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$addReusableStateDataViews$1.apply(AggsHandlerCodeGenerator.scala:1211) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$.addReusableStateDataViews(AggsHandlerCodeGenerator.scala:1211) at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.(ImperativeAggCodeGen.scala:112) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:233) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:214) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.initialAggregateInformation(AggsHandlerCodeGenerator.scala:214) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:325) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:143) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:52) at org.apache.flink.table.planner.plan.nodes.exec.ExecNo