[jira] [Commented] (FLINK-31856) Add support for Opensearch Connector REST client customization
[ https://issues.apache.org/jira/browse/FLINK-31856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731895#comment-17731895 ] Michael Hempel-Jørgensen commented on FLINK-31856: -- [~reta] we have tested the PR and it solves our use case so that is great (y) The main improvement we could suggest is that we had to copy the whole call to builder.setHttpClientConfigCallback(), i.e. lines 45-75 in DefaultRestClientFactory in order to add a call to httpClientBuilder.addInterceptorFirst(); I'm not sure if anything can be done to alleviate that so we can somehow inherit the default behaviour, but in general the PR is still a great improvement. > Add support for Opensearch Connector REST client customization > -- > > Key: FLINK-31856 > URL: https://issues.apache.org/jira/browse/FLINK-31856 > Project: Flink > Issue Type: Improvement > Components: Connectors / Opensearch >Affects Versions: opensearch-1.0.0 >Reporter: Michael Hempel-Jørgensen >Assignee: Andriy Redko >Priority: Minor > Labels: pull-request-available > > It is not currently possible to customise the Opensearch REST client in all > of the connectors. > We are currently using the using the OpensearchSink in > [connector/opensearch/sink/OpensearchSink.java|https://github.com/apache/flink-connector-opensearch/blob/main/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java] > and need to be able to authenticate/authorise with Opensearch using OAuth2 > and therefore need to be able to pass a bearer token to the bulk index calls. > The access token will expire and change during the jobs lifetime it must be > possible to handle this, i.e. giving a token at when building the sink is not > enough. > For reference see the mailing list discussion: > https://lists.apache.org/thread/9rvwhzjwzm6yq9mg481sdxqx9nqr1x5g -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId
[ https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731876#comment-17731876 ] Benchao Li commented on FLINK-32320: [~aitozi] Then you need to open dedicated Calcite issue, and give clear definition and description of the problem in Calcite side, and move discussion there. Note that in Calcite, clear description of the problem is more important than the way to fix it. > Same correlate can not be reused due to the different correlationId > --- > > Key: FLINK-32320 > URL: https://issues.apache.org/jira/browse/FLINK-32320 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > As describe in SubplanReuserTest > {code:java} > @Test > def testSubplanReuseOnCorrelate(): Unit = { > util.addFunction("str_split", new StringSplit()) > val sqlQuery = > """ > |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, > '-')) AS T(v)) > |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v > """.stripMargin > // TODO the sub-plan of Correlate should be reused, > // however the digests of Correlates are different > util.verifyExecPlan(sqlQuery) > } > {code} > This will produce the plan > {code:java} > HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, > b0, c0, f00], build=[right]) > :- Exchange(distribution=[hash[f0]]) > : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], > correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[f0]]) >+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], > correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > {code} > The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-kafka] yuxiqian opened a new pull request, #32: [FLINK-32289][docs] Fix incorrect metadata column type in examples
yuxiqian opened a new pull request, #32: URL: https://github.com/apache/flink-connector-kafka/pull/32 This PR fixes incorrect metadata column type in Kafka connector examples. ## Brief change log Changed timestamp metadata column type from TIMESTAMP to TIMESTAMP_LTZ (with local timezone) in [Kafka SQL connector examples](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/kafka/#how-to-create-a-kafka-table) in both English and Chinese docs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Myasuka commented on a diff in pull request #22767: FLIP-296: Extend watermark-related features for SQL
Myasuka commented on code in PR #22767: URL: https://github.com/apache/flink/pull/22767#discussion_r1227481717 ## docs/content.zh/docs/dev/table/concepts/time_attributes.md: ## @@ -272,6 +272,99 @@ GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE); ``` + 在SQL中使用watermark进阶功能 +之前的版本中,Watermark的很多进阶功能(比如watermark对齐)通过datastream api很容易使用,但想在sql中使用却不太容易,所以我们在1.18版本对这些功能进行了扩展,使用户也能够在sql中用到这些功能。 + +{{< hint warning >}} +**Note:** 只有实现了`SupportsWatermarkPushDown`接口的源连接器(source connector)(比如kafka、pulsar)才可以使用这些进阶功能。如果一个源连接器(source connector)没有实现`SupportsWatermarkPushDown`接口,但是任务配置了这些参数,任务可以正常运行,但是这些参数也不会生效。 +{{< /hint >}} + +# I. 配置Watermark发射方式 +Flink中watermark有两种发射方式: + +- on-periodic: 周期性发射 +- on-event: 每条事件数据发射一次watermark + +在DataStream API,用户可以通过WatermarkGenerator接口来决定选择哪种方式([自定义 WatermarkGenerator]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}#自定义-watermarkgenerator)),而对于sql任务,watermark默认是周期性发射的方式,默认周期是200ms,这个周期可以通过参数`pipeline.auto-watermark-interval`来进行修改。如果需要每条事件数据都发射一次watermark,可以在source表中进行如下配置: + +```sql +-- configure in table options +CREATE TABLE user_actions ( + ... + user_action_time TIMESTAMP(3), + WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND +) WITH ( + 'scan.watermark.emit.strategy'='on-event', + ... +); +``` + +当然,也可以使用`OPTIONS` hint来配置: +```sql +-- use 'OPTIONS' hint +select ... from source_table /*+ OPTIONS('scan.watermark.emit.strategy'='on-periodic') */ +``` + +# II. 配置数据源(Source)的空闲超时时间 + +如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着`WatermarkGenerator`也不会获得任何新数据去生成watermark,我们称这类数据源为空闲输入或空闲源。在这种情况下,如果其他某些分区仍然在发送事件数据就会出现问题,因为下游算子watermark的计算方式是取所有上游并行数据源watermark的最小值,由于空闲的分片/分区没有计算新的watermark,任务的watermark将不会发生变化,如果配置了数据源的空闲超时时间,一个分区/分片在超时时间没有发送事件数据就会被标记为空闲,下游计算新的watermark的时候将会忽略这个空闲sourse,从而让watermark继续推进。 + +在sql中可以通过`table.exec.source.idle-timeout`参数来定义一个全局的超时时间,每个数据源都会生效。但如果你想为每个数据源设置不同的空闲超时时间,可以直接在源表中进行设置: + +```sql +-- configure in table options +CREATE TABLE user_actions ( + ... + user_action_time TIMESTAMP(3), + WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND +) WITH ( + 'scan.watermark.idle-timeout'='1min', + ... +); +``` + +或者也可以使用`OPTIONS` hint: +```sql +-- use 'OPTIONS' hint +select ... from source_table /*+ OPTIONS('scan.watermark.idle-timeout'='1min') */ +``` + +# III. Watermark对齐 +受到数据分布或者机器负载等各种因素的影响,同一个数据源的不同分区/分片之间可能出现消费速度不一样的情况,不同数据源之间的消费速度也可能不一样,假如下游有一些有状态的算子,这些算子可能需要在状态中缓存更多那些消费更快的数据,等待那些消费慢的数据,状态可能会变得很大;消费速率不一致也可能造成更严重的数据乱序情况,可能会影响窗口的计算准确度。这些场景都可以使用watermark对齐功能,确保源表的某个分片/分块/分区的watermark不会比其他分片/分块/分区增加太快,从而避免上述问题,需要注意的是watermark对齐功能会影响任务的性能,这取决于不同源表之间数据消费差别有多大。 + +在sql任务中可以在源表中配置watermark对齐: + +```sql +-- configure in table options +CREATE TABLE user_actions ( +... +user_action_time TIMESTAMP(3), + WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND +) WITH ( +'scan.watermark.alignment.group'='alignment-group-1', +'scan.watermark.alignment.max-drift'='1min', +'scan.watermark.alignment.update-interval'='1s', +... +); +``` + +当然,你也依然可以用`OPTIONS` hint: + +```sql +-- use 'OPTIONS' hint +select ... from source_table /*+ OPTIONS('scan.watermark.alignment.group'='alignment-group-1', 'scan.watermark.alignment.max-drift'='1min', 'scan.watermark.alignment.update-interval'='1s') */ +``` + +这里有三个参数: + +- `scan.watermark.alignment.group`配置对齐组名称,在同一个组的数据源将会对齐 +- `scan.watermark.alignment.max-drift`配置分片/分块/分区允许偏离对齐时间的最大范围 +- `scan.watermark.alignment.update-interval`配置计算对齐时间的频率,非必需,默认是1s + +{{< hint warning >}} +**Note:** 如果源连接器(source connector)未实现[FLIP-217](https://cwiki.apache.org/confluence/display/FLINK/FLIP-217%3A+Support+watermark+alignment+of+source+splits),并且使用了watermark对齐的功能,那么任务运行会抛出异常,用户可以设置pipeline.watermark-arignment.Allow-naligned-Source-Splits=true来禁用源分片的WaterMark对齐功能,此时,只有当分片数量等于源并行度的时候,watermark对齐功能才能正常工作。 Review Comment: Wrong parameters, it should be `pipeline.watermark-alignment.allow-unaligned-source-splits` ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/watermark/WatermarkParams.java: ## @@ -26,27 +38,93 @@ public void setEmitStrategy(WatermarkEmitStrategy emitStrategy) { this.emitStrategy = emitStrategy; } +public String getAlignGroupName() { +return alignGroupName; +} + +public void setAlignGroupName(String alignGroupName) { +this.alignGroupName = alignGroupName; +} + +public Duration getAlignMaxDrift() { +return alignMaxDrift; +} + +public void setAlignMaxDrift(Duration alignMaxDrift) { +this.alignMaxDrift = alignMaxDrift; +} + +public Duration getAlignUpdateInterval() { +return alignUpdateInterval; +} + +public void setAlignUpdateInterval(Duration alignUpdateInterval) { +this.alignUpdateInterval = alignUpdateInterval; +} + +public boolean alignWatermarkEnabled() { +return !StringUtils.isNullOrWhitespaceOnly(alignGroupName) +&&
[GitHub] [flink] flinkbot commented on pull request #22768: [FLINK-32309][sql-gateway] Use independent resource manager for table environment
flinkbot commented on PR #22768: URL: https://github.com/apache/flink/pull/22768#issuecomment-1588509273 ## CI report: * f4d5c01b209db532ed82e2cd9d50941abe5dfb01 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32309) Shared classpaths and jars manager for jobs in sql gateway cause confliction
[ https://issues.apache.org/jira/browse/FLINK-32309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32309: --- Labels: pull-request-available (was: ) > Shared classpaths and jars manager for jobs in sql gateway cause confliction > > > Key: FLINK-32309 > URL: https://issues.apache.org/jira/browse/FLINK-32309 > Project: Flink > Issue Type: Bug > Components: Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: FangYong >Priority: Major > Labels: pull-request-available > > Current all jobs in the same session of sql gateway will share the resource > manager which provide the classpath for jobs. After a job is performed, it's > classpath and jars will be in the shared resource manager which are used by > the next jobs. It may cause too many unnecessary jars in a job or even cause > confliction -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] FangYongs opened a new pull request, #22768: [FLINK-32309][sql-gateway] Use independent resource manager for table environment
FangYongs opened a new pull request, #22768: URL: https://github.com/apache/flink/pull/22768 ## What is the purpose of the change This PR aims to create new resource manager for table environment in sql gateway ## Brief change log - Create new resource manager which will share the classloader and local path with the previous manager - Create table environment for each operation in gateway with the new resource manager - Add jars and functions related operation in gateway which will be performed on the session resource manager. ## Verifying this change This change added tests and can be verified as follows: - Updated `SqlGatewayServiceITCase.testListUserDefinedFunctions` to test `create` functions. - Updated `SqlGatewayServiceITCase.testConfigureSessionWithLegalStatement` to check the jars are empty if a query not use udf ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32321) Temporal Join job missing condition after “ON”
macdoor615 created FLINK-32321: -- Summary: Temporal Join job missing condition after “ON” Key: FLINK-32321 URL: https://issues.apache.org/jira/browse/FLINK-32321 Project: Flink Issue Type: Bug Components: Table SQL / Gateway Affects Versions: 1.17.1 Reporter: macdoor615 Fix For: 1.17.2 We have a SQL job, like this {code:java} select ... from prod_kafka.f_alarm_tag_dev /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */ as f left join mysql_bnpmp.gem_bnpmp.f_a_alarm_filter /*+ OPTIONS('lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '30s') */ FOR SYSTEM_TIME AS OF f.proctime ff on ff.rule_type = 0 and f.ne_ip = ff.ip {code} We submit to flink 1.17.1 cluster with sql-gateway. We found job detail missing lookup condition (rule_type=0) {code:java} +- [1196]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter], joinType=[LeftOuterJoin], lookup=[ip=ne_ip], select=[event_id, {code} We submit same sql to flink 1.17.0 cluster with sql-gateway. There is (rule_type=0) lookup condition {code:java} +- [3]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter], joinType=[LeftOuterJoin], lookup=[rule_type=0, ip=ne_ip], where=[(rule_type = 0)], select=[event_id, severity,{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] leonardBang commented on pull request #22760: [FLINK-32289][kafka] Fix incorrect metadata column type in docs
leonardBang commented on PR #22760: URL: https://github.com/apache/flink/pull/22760#issuecomment-1588469379 @yuxiqian Could you also open a PR for https://github.com/apache/flink-connector-kafka repo as we plan to move kafka connector there from flink main repo? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang merged pull request #22760: [FLINK-32289][kafka] Fix incorrect metadata column type in docs
leonardBang merged PR #22760: URL: https://github.com/apache/flink/pull/22760 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId
[ https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731832#comment-17731832 ] Aitozi commented on FLINK-32320: [~libenchao] Thanks for your attention. I just made a quick fix on calcite side when creating a new correlationId. If in the same scope and same identifier, using the same correlationId as before. It works as expected. What do you think of this solution ? > Same correlate can not be reused due to the different correlationId > --- > > Key: FLINK-32320 > URL: https://issues.apache.org/jira/browse/FLINK-32320 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > As describe in SubplanReuserTest > {code:java} > @Test > def testSubplanReuseOnCorrelate(): Unit = { > util.addFunction("str_split", new StringSplit()) > val sqlQuery = > """ > |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, > '-')) AS T(v)) > |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v > """.stripMargin > // TODO the sub-plan of Correlate should be reused, > // however the digests of Correlates are different > util.verifyExecPlan(sqlQuery) > } > {code} > This will produce the plan > {code:java} > HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, > b0, c0, f00], build=[right]) > :- Exchange(distribution=[hash[f0]]) > : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], > correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[f0]]) >+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], > correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > {code} > The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32319) flink can't the partition of network after restart
[ https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731830#comment-17731830 ] Wencong Liu commented on FLINK-32319: - If this exception occurs, it indicates that the upstream vertex initializes its ResultPartition slower than the downstream vertex. This happens when the upstream vertex is short of CPU/Memory resources. We should give the LocalInputChannel more chances to request, so we can increase "taskmanager.network.request-backoff.max" to add the backoff from 10s to bigger value. [~1026688210] > flink can't the partition of network after restart > -- > > Key: FLINK-32319 > URL: https://issues.apache.org/jira/browse/FLINK-32319 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.17.1 > Environment: centos 7. > jdk 8. > flink1.17.1 application mode on yarn > flink configuration : > ``` > $internal.application.program-argssql2 > $internal.deployment.config-dir /data/home/flink/wgcn/flink-1.17.1/conf > $internal.yarn.log-config-file > /data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties > akka.ask.timeout 100s > blob.server.port 15402 > classloader.check-leaked-classloader false > classloader.resolve-order parent-first > env.java.opts.taskmanager -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 > execution.attachedtrue > execution.checkpointing.aligned-checkpoint-timeout10 min > execution.checkpointing.externalized-checkpoint-retention > RETAIN_ON_CANCELLATION > execution.checkpointing.interval 10 min > execution.checkpointing.min-pause 10 min > execution.savepoint-restore-mode NO_CLAIM > execution.savepoint.ignore-unclaimed-statefalse > execution.shutdown-on-attached-exit false > execution.target embedded > high-availability zookeeper > high-availability.cluster-id application_1684133071014_7202676 > high-availability.storageDir hdfs:///user/flink/recovery > high-availability.zookeeper.path.root /flink > high-availability.zookeeper.quorumx > internal.cluster.execution-mode NORMAL > internal.io.tmpdirs.use-local-default true > io.tmp.dirs > /data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676 > jobmanager.execution.failover-strategyregion > jobmanager.memory.heap.size 9261023232b > jobmanager.memory.jvm-metaspace.size 268435456b > jobmanager.memory.jvm-overhead.max1073741824b > jobmanager.memory.jvm-overhead.min1073741824b > jobmanager.memory.off-heap.size 134217728b > jobmanager.memory.process.size10240m > jobmanager.rpc.address > jobmanager.rpc.port 31332 > metrics.reporter.promgateway.deleteOnShutdown true > metrics.reporter.promgateway.factory.class > org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory > metrics.reporter.promgateway.hostUrl :9091 > metrics.reporter.promgateway.interval 60 SECONDS > metrics.reporter.promgateway.jobName join_phase3_v7 > metrics.reporter.promgateway.randomJobNameSuffix false > parallelism.default 128 > pipeline.classpaths > pipeline.jars > file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar > rest.address > rest.bind-address x > rest.bind-port5-50500 > rest.flamegraph.enabled true > restart-strategy.failure-rate.delay 10 s > restart-strategy.failure-rate.failure-rate-interval 1 min > restart-strategy.failure-rate.max-failures-per-interval 6 > restart-strategy.type exponential-delay > state.backend.typefilesystem > state.checkpoints.dir hdfs://xx/user/flink/checkpoints-data/wgcn > state.checkpoints.num-retained3 > taskmanager.memory.managed.fraction 0 > taskmanager.memory.network.max600mb > taskmanager.memory.process.size 10240m >
[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId
[ https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731829#comment-17731829 ] Benchao Li commented on FLINK-32320: You are correct, this will miss some opportunity of reusing common table expressions, including the {{VIEW}} used multi times. > Same correlate can not be reused due to the different correlationId > --- > > Key: FLINK-32320 > URL: https://issues.apache.org/jira/browse/FLINK-32320 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > As describe in SubplanReuserTest > {code:java} > @Test > def testSubplanReuseOnCorrelate(): Unit = { > util.addFunction("str_split", new StringSplit()) > val sqlQuery = > """ > |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, > '-')) AS T(v)) > |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v > """.stripMargin > // TODO the sub-plan of Correlate should be reused, > // however the digests of Correlates are different > util.verifyExecPlan(sqlQuery) > } > {code} > This will produce the plan > {code:java} > HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, > b0, c0, f00], build=[right]) > :- Exchange(distribution=[hash[f0]]) > : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], > correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[f0]]) >+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], > correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > {code} > The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] KarmaGYZ commented on a diff in pull request #22137: [FLINK-31233][Rest] Return 404 when task manager stdout/log files not exist.
KarmaGYZ commented on code in PR #22137: URL: https://github.com/apache/flink/pull/22137#discussion_r1227428286 ## flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java: ## @@ -116,12 +85,11 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { private TransientBlobKey transientBlobKey2; -@BeforeClass +@BeforeAll public static void setup() throws IOException, HandlerRequestException { Review Comment: Why we need to keep the public modifier? Same as below. ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java: ## @@ -151,34 +151,12 @@ protected CompletableFuture respondToRequest( }, ctx.executor()); -return resultFuture.whenComplete( +return resultFuture.handle( (Void ignored, Throwable throwable) -> { if (throwable != null) { -log.error( -"Failed to transfer file from TaskExecutor {}.", -taskManagerId, -throwable); -fileBlobKeys.invalidate(taskManagerId); - -final Throwable strippedThrowable = - ExceptionUtils.stripCompletionException(throwable); - -if (strippedThrowable instanceof UnknownTaskExecutorException) { -throw new CompletionException( -new NotFoundException( -String.format( -"Failed to transfer file from TaskExecutor %s because it was unknown.", -taskManagerId), -strippedThrowable)); -} else { -throw new CompletionException( -new FlinkException( -String.format( -"Failed to transfer file from TaskExecutor %s.", -taskManagerId), -strippedThrowable)); -} +handleException(ctx, httpRequest, throwable, taskManagerId); } +return ignored; Review Comment: Not sure why we need to return here? ## flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelHandlerContext.java: ## @@ -46,11 +48,21 @@ class TestingChannelHandlerContext implements ChannelHandlerContext { final File outputFile; final ChannelPipeline pipeline = new TestingChannelPipeline(); +HttpResponse httpResponse; +byte[] responseData; Review Comment: Ditto. private modifier. ## flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java: ## @@ -165,6 +168,81 @@ void testFileCacheExpiration() throws Exception { assertThat(FileUtils.readFileUtf8(outputFile)).isEqualTo(fileContent2); } +@Test +void testStdoutFileHandlerHandleFileNotFoundException() throws Exception { +final Time cacheEntryDuration = Time.milliseconds(1000L); +TestingTaskManagerStdoutFileHandler testingTaskManagerStdoutFileHandler = +createTestTaskManagerStdoutFileHandler( +cacheEntryDuration, new FileNotFoundException("file not found")); +final File outputFile = TempDirUtils.newFile(temporaryFolder.toPath()); +final TestingChannelHandlerContext testingContext = +new TestingChannelHandlerContext(outputFile); + +CompletableFuture handleFuture = +testingTaskManagerStdoutFileHandler.respondToRequest( +testingContext, HTTP_REQUEST, handlerRequest, null); +assertThat(handleFuture).isCompleted(); +assertThat(testingContext.getHttpResponse()) +.isNotNull() +.satisfies( +httpResponse -> + assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.OK)); +assertThat(testingContext.getResponseData()) +.isNotNull() +.satisfies( +data -> +assertThat(new String(data, "UTF-8")) +.isEqualTo( + TaskManagerStdoutFileHandler.FILE_NOT_FOUND_INFO)); +} + +@Test +void testStdoutFileHandlerHandleOtherException() throws Exception { +
[GitHub] [flink] flinkbot commented on pull request #22767: FLIP-296: Extend watermark-related features for SQL
flinkbot commented on PR #22767: URL: https://github.com/apache/flink/pull/22767#issuecomment-1588429310 ## CI report: * 26d4747481a26ae4b192b7744e48a6757ddb2391 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] WenDing-Y commented on pull request #49: [FLINK-32068] connector jdbc support clickhouse
WenDing-Y commented on PR #49: URL: https://github.com/apache/flink-connector-jdbc/pull/49#issuecomment-1588427649 @MartijnVisser the pr is ready -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] yuchengxin opened a new pull request, #22767: Feature/watermark params
yuchengxin opened a new pull request, #22767: URL: https://github.com/apache/flink/pull/22767 ## What is the purpose of the change The PR proposes to extend watermark-related features in SQL layer, these fetures have been implemented on the datastream API, but not available in sql yet. The extening features as follows: 1. configurable watermark emit strategy in sql 2. dealing with idle sources in sql 3. configurable watermark alignment in sql ## Brief change log - Added `org.apache.flink.table.watermark.WatermarkParams` to express the parameters of extended watermark-related features - Added and validated releated params in `org.apache.flink.table.factories.FactoryUtil` - Parse out the watermark-related configs from hint or table options in `org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleBase` - Handle watermark-related features in `org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec` ## Verifying this change This change added tests and can be verified as follows: - Added test cases in `org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleTest` to test use these extended features with sql hint or tabe options - The serialization and deserialization of `DynamicTableSourceSpec` is covered by existing tests: `org.apache.flink.table.planner.plan.nodes.exec.serde.DynamicTableSourceSpecSerdeTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) yes - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) yes - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32319) flink can't the partition of network after restart
[ https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731827#comment-17731827 ] wgcn commented on FLINK-32319: -- hi~ [~Wencong Liu] thanks for your response, I will try the config, I have a question. I just roughly looked at the meaning of this config. Why does this config need to be set so large? Is it related to parallel reading? We are using version 1.12 of Flink in our production environment, and I have never paid attention to this config before. Is this because some mechanisms were added after version 1.12 > flink can't the partition of network after restart > -- > > Key: FLINK-32319 > URL: https://issues.apache.org/jira/browse/FLINK-32319 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.17.1 > Environment: centos 7. > jdk 8. > flink1.17.1 application mode on yarn > flink configuration : > ``` > $internal.application.program-argssql2 > $internal.deployment.config-dir /data/home/flink/wgcn/flink-1.17.1/conf > $internal.yarn.log-config-file > /data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties > akka.ask.timeout 100s > blob.server.port 15402 > classloader.check-leaked-classloader false > classloader.resolve-order parent-first > env.java.opts.taskmanager -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 > execution.attachedtrue > execution.checkpointing.aligned-checkpoint-timeout10 min > execution.checkpointing.externalized-checkpoint-retention > RETAIN_ON_CANCELLATION > execution.checkpointing.interval 10 min > execution.checkpointing.min-pause 10 min > execution.savepoint-restore-mode NO_CLAIM > execution.savepoint.ignore-unclaimed-statefalse > execution.shutdown-on-attached-exit false > execution.target embedded > high-availability zookeeper > high-availability.cluster-id application_1684133071014_7202676 > high-availability.storageDir hdfs:///user/flink/recovery > high-availability.zookeeper.path.root /flink > high-availability.zookeeper.quorumx > internal.cluster.execution-mode NORMAL > internal.io.tmpdirs.use-local-default true > io.tmp.dirs > /data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676 > jobmanager.execution.failover-strategyregion > jobmanager.memory.heap.size 9261023232b > jobmanager.memory.jvm-metaspace.size 268435456b > jobmanager.memory.jvm-overhead.max1073741824b > jobmanager.memory.jvm-overhead.min1073741824b > jobmanager.memory.off-heap.size 134217728b > jobmanager.memory.process.size10240m > jobmanager.rpc.address > jobmanager.rpc.port 31332 > metrics.reporter.promgateway.deleteOnShutdown true > metrics.reporter.promgateway.factory.class > org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory > metrics.reporter.promgateway.hostUrl :9091 > metrics.reporter.promgateway.interval 60 SECONDS > metrics.reporter.promgateway.jobName join_phase3_v7 > metrics.reporter.promgateway.randomJobNameSuffix false > parallelism.default 128 > pipeline.classpaths > pipeline.jars > file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar > rest.address > rest.bind-address x > rest.bind-port5-50500 > rest.flamegraph.enabled true > restart-strategy.failure-rate.delay 10 s > restart-strategy.failure-rate.failure-rate-interval 1 min > restart-strategy.failure-rate.max-failures-per-interval 6 > restart-strategy.type exponential-delay > state.backend.typefilesystem > state.checkpoints.dir hdfs://xx/user/flink/checkpoints-data/wgcn > state.checkpoints.num-retained3 > taskmanager.memory.managed.fraction 0 > taskmanager.memory.network.max600mb > taskmanager.memory.process.size 10240m >
[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-32316: Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. And we also found there is another problem that announceCombinedWatermark may throw a exception (like "subtask 25 is not ready yet to receive events" , this subtask maybe under failover) lead the period task not running any more (ThreadPoolExecutor will not schedule the period task if it throw a exception), i think we should increase the robustness of announceCombinedWatermark function to cover this case (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199] ) was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > > And we also found there is another problem that announceCombinedWatermark may > throw a exception (like "subtask 25 is not ready yet to receive events" , > this subtask maybe under failover) lead the period task not running any more > (ThreadPoolExecutor will not schedule the period task if it throw a > exception), i think we should increase the robustness of > announceCombinedWatermark
[jira] [Commented] (FLINK-28076) StreamFaultToleranceTestBase runs into timeout
[ https://issues.apache.org/jira/browse/FLINK-28076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731828#comment-17731828 ] Rui Fan commented on FLINK-28076: - {quote}[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49865=logs=4d4a0d10-fca2-5507-8eed-c07f0bdf4887=7b25afdf-cc6c-566f-5459-359dc2585798=17896] {quote} Thanks [~Sergey Nuyanzin] for reporting this. This log failed to upload, not sure why it timed out. > StreamFaultToleranceTestBase runs into timeout > -- > > Key: FLINK-28076 > URL: https://issues.apache.org/jira/browse/FLINK-28076 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.16.0, 1.17.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > [Build > #36259|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36259=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=12054] > got stuck in a {{StreamFaultToleranceTestBase}} subclass: > {code} > "main" #1 prio=5 os_prio=0 tid=0x7eff8c00b800 nid=0xf1a waiting on > condition [0x7eff92eca000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x82213498> (a > java.util.concurrent.CompletableFuture$Signaller) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:92) > at > org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase.runCheckpointedProgram(StreamFaultToleranceTestBase.java:136) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > [...] > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32320) Same correlate can not be reused due to the different correlationId
[ https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731823#comment-17731823 ] Aitozi commented on FLINK-32320: In production, multi sink job are very common, if the table from UDTF is queried multi times, it will cause the function to be executed multi times(as shown in the plan). This will lead to bad performance. After some research, I think it's should be caused by: During sqlToRel process, the table function sqlNode will be `toRel` multi times and leads to different correlationId. > Same correlate can not be reused due to the different correlationId > --- > > Key: FLINK-32320 > URL: https://issues.apache.org/jira/browse/FLINK-32320 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Aitozi >Priority: Major > > As describe in SubplanReuserTest > {code:java} > @Test > def testSubplanReuseOnCorrelate(): Unit = { > util.addFunction("str_split", new StringSplit()) > val sqlQuery = > """ > |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, > '-')) AS T(v)) > |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v > """.stripMargin > // TODO the sub-plan of Correlate should be reused, > // however the digests of Correlates are different > util.verifyExecPlan(sqlQuery) > } > {code} > This will produce the plan > {code:java} > HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, > b0, c0, f00], build=[right]) > :- Exchange(distribution=[hash[f0]]) > : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], > correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[f0]]) >+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], > correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], > rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, > VARCHAR(2147483647) f0)], joinType=[INNER]) > +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, > source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > {code} > The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32320) Same correlate can not be reused due to the different correlationId
Aitozi created FLINK-32320: -- Summary: Same correlate can not be reused due to the different correlationId Key: FLINK-32320 URL: https://issues.apache.org/jira/browse/FLINK-32320 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Aitozi As describe in SubplanReuserTest {code:java} @Test def testSubplanReuseOnCorrelate(): Unit = { util.addFunction("str_split", new StringSplit()) val sqlQuery = """ |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) AS T(v)) |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v """.stripMargin // TODO the sub-plan of Correlate should be reused, // however the digests of Correlates are different util.verifyExecPlan(sqlQuery) } {code} This will produce the plan {code:java} HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0, c0, f00], build=[right]) :- Exchange(distribution=[hash[f0]]) : +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[f0]]) +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} The Correlate node can not be reused due to the different correlation id. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32319) flink can't the partition of network after restart
[ https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731822#comment-17731822 ] Wencong Liu commented on FLINK-32319: - Hi [~1026688210] , maybe you could try this config "taskmanager.network.request-backoff.max: 2" because its default value is 1. > flink can't the partition of network after restart > -- > > Key: FLINK-32319 > URL: https://issues.apache.org/jira/browse/FLINK-32319 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.17.1 > Environment: centos 7. > jdk 8. > flink1.17.1 application mode on yarn > flink configuration : > ``` > $internal.application.program-argssql2 > $internal.deployment.config-dir /data/home/flink/wgcn/flink-1.17.1/conf > $internal.yarn.log-config-file > /data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties > akka.ask.timeout 100s > blob.server.port 15402 > classloader.check-leaked-classloader false > classloader.resolve-order parent-first > env.java.opts.taskmanager -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 > execution.attachedtrue > execution.checkpointing.aligned-checkpoint-timeout10 min > execution.checkpointing.externalized-checkpoint-retention > RETAIN_ON_CANCELLATION > execution.checkpointing.interval 10 min > execution.checkpointing.min-pause 10 min > execution.savepoint-restore-mode NO_CLAIM > execution.savepoint.ignore-unclaimed-statefalse > execution.shutdown-on-attached-exit false > execution.target embedded > high-availability zookeeper > high-availability.cluster-id application_1684133071014_7202676 > high-availability.storageDir hdfs:///user/flink/recovery > high-availability.zookeeper.path.root /flink > high-availability.zookeeper.quorumx > internal.cluster.execution-mode NORMAL > internal.io.tmpdirs.use-local-default true > io.tmp.dirs > /data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676 > jobmanager.execution.failover-strategyregion > jobmanager.memory.heap.size 9261023232b > jobmanager.memory.jvm-metaspace.size 268435456b > jobmanager.memory.jvm-overhead.max1073741824b > jobmanager.memory.jvm-overhead.min1073741824b > jobmanager.memory.off-heap.size 134217728b > jobmanager.memory.process.size10240m > jobmanager.rpc.address > jobmanager.rpc.port 31332 > metrics.reporter.promgateway.deleteOnShutdown true > metrics.reporter.promgateway.factory.class > org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory > metrics.reporter.promgateway.hostUrl :9091 > metrics.reporter.promgateway.interval 60 SECONDS > metrics.reporter.promgateway.jobName join_phase3_v7 > metrics.reporter.promgateway.randomJobNameSuffix false > parallelism.default 128 > pipeline.classpaths > pipeline.jars > file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar > rest.address > rest.bind-address x > rest.bind-port5-50500 > rest.flamegraph.enabled true > restart-strategy.failure-rate.delay 10 s > restart-strategy.failure-rate.failure-rate-interval 1 min > restart-strategy.failure-rate.max-failures-per-interval 6 > restart-strategy.type exponential-delay > state.backend.typefilesystem > state.checkpoints.dir hdfs://xx/user/flink/checkpoints-data/wgcn > state.checkpoints.num-retained3 > taskmanager.memory.managed.fraction 0 > taskmanager.memory.network.max600mb > taskmanager.memory.process.size 10240m > taskmanager.memory.segment-size 128kb > taskmanager.network.memory.buffers-per-channel8 > taskmanager.network.memory.floating-buffers-per-gate 800 > taskmanager.numberOfTaskSlots 2 > web.port 0 > web.tmpdir
[jira] [Commented] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins
[ https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731817#comment-17731817 ] Natea Eshetu Beshada commented on FLINK-32318: -- Hi [~luismacosta], I believe you need to update your dockerfile to include something like this {code:java} COPY --from=build /app/flink-kubernetes-plugins/target/dependency/flink-s3-fs-presto* /opt/flink/plugins/s3/ {code} > [flink-operator] missing s3 plugin in folder plugins > > > Key: FLINK-32318 > URL: https://issues.apache.org/jira/browse/FLINK-32318 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: Luís Costa >Priority: Minor > > Greetings, > I'm trying to configure [Flink's Kubernetes HA > services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/] > for flink operator jobs, but got an error regarding s3 plugin: _"Could not > find a file system implementation for scheme 's3'. The scheme is directly > supported by Flink through the following plugin(s): flink-s3-fs-hadoop, > flink-s3-fs-presto"_ > {code:java} > 2023-06-12 10:05:16,981 INFO akka.remote.Remoting > [] - Starting remoting > 2023-06-12 10:05:17,194 INFO akka.remote.Remoting > [] - Remoting started; listening on addresses > :[akka.tcp://flink@10.4.125.209:6123] > 2023-06-12 10:05:17,377 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor > system started at akka.tcp://flink@10.4.125.209:6123 > 2023-06-12 10:05:18,175 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting > KubernetesApplicationClusterEntrypoint down with application status FAILED. > Diagnostics org.apache.flink.util.FlinkException: Could not create the ha > services from the instantiated HighAvailabilityServicesFactory > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory. > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299) > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285) > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) > at > org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) > Caused by: java.io.IOException: Could not create FileSystem for highly > available storage path > (s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto) > at > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102) > at > org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86) > at > org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41) > at > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296) > ... 10 more > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme 's3'. The scheme is > directly supported by Flink through the following plugin(s): > flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin > resides within its own subfolder within the plugins directory. See > https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ > for more information. If you want to use a Hadoop file system for that > scheme, please add the scheme to the configuration > fs.allowed-fallback-filesystems. For a full list of supported file systems, > please see
[jira] [Updated] (FLINK-32319) flink can't the partition of network after restart
[ https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wgcn updated FLINK-32319: - Environment: centos 7. jdk 8. flink1.17.1 application mode on yarn flink configuration : ``` $internal.application.program-args sql2 $internal.deployment.config-dir /data/home/flink/wgcn/flink-1.17.1/conf $internal.yarn.log-config-file /data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties akka.ask.timeout100s blob.server.port15402 classloader.check-leaked-classloaderfalse classloader.resolve-order parent-first env.java.opts.taskmanager -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 execution.attached true execution.checkpointing.aligned-checkpoint-timeout 10 min execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION execution.checkpointing.interval10 min execution.checkpointing.min-pause 10 min execution.savepoint-restore-modeNO_CLAIM execution.savepoint.ignore-unclaimed-state false execution.shutdown-on-attached-exit false execution.targetembedded high-availability zookeeper high-availability.cluster-idapplication_1684133071014_7202676 high-availability.storageDirhdfs:///user/flink/recovery high-availability.zookeeper.path.root /flink high-availability.zookeeper.quorum x internal.cluster.execution-mode NORMAL internal.io.tmpdirs.use-local-default true io.tmp.dirs /data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676 jobmanager.execution.failover-strategy region jobmanager.memory.heap.size 9261023232b jobmanager.memory.jvm-metaspace.size268435456b jobmanager.memory.jvm-overhead.max 1073741824b jobmanager.memory.jvm-overhead.min 1073741824b jobmanager.memory.off-heap.size 134217728b jobmanager.memory.process.size 10240m jobmanager.rpc.address jobmanager.rpc.port 31332 metrics.reporter.promgateway.deleteOnShutdown true metrics.reporter.promgateway.factory.class org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory metrics.reporter.promgateway.hostUrl:9091 metrics.reporter.promgateway.interval 60 SECONDS metrics.reporter.promgateway.jobNamejoin_phase3_v7 metrics.reporter.promgateway.randomJobNameSuffixfalse parallelism.default 128 pipeline.classpaths pipeline.jars file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar rest.address rest.bind-address x rest.bind-port 5-50500 rest.flamegraph.enabled true restart-strategy.failure-rate.delay 10 s restart-strategy.failure-rate.failure-rate-interval 1 min restart-strategy.failure-rate.max-failures-per-interval 6 restart-strategy.type exponential-delay state.backend.type filesystem state.checkpoints.dir hdfs://xx/user/flink/checkpoints-data/wgcn state.checkpoints.num-retained 3 taskmanager.memory.managed.fraction 0 taskmanager.memory.network.max 600mb taskmanager.memory.process.size 10240m taskmanager.memory.segment-size 128kb taskmanager.network.memory.buffers-per-channel 8 taskmanager.network.memory.floating-buffers-per-gate800 taskmanager.numberOfTaskSlots 2 web.port0 web.tmpdir /tmp/flink-web-1b87445e-2761-4f16-97a1-8d4fc6fa8534 yarn.application-attempt-failures-validity-interval 6 yarn.application-attempts 3 yarn.application.name join_phase3_v7 yarn.heartbeat.container-request-interval 700 ``` was: centos 7. jdk 8. flink1.17.1 application mode on yarn > flink can't the partition of network after restart > -- > > Key: FLINK-32319 > URL: https://issues.apache.org/jira/browse/FLINK-32319 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.17.1 > Environment: centos 7. > jdk 8. > flink1.17.1 application mode on
[jira] [Updated] (FLINK-32319) flink can't the partition of network after restart
[ https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wgcn updated FLINK-32319: - Attachment: image-2023-06-13-07-14-48-958.png Component/s: Runtime / Network Affects Version/s: 1.17.1 Description: flink can't the partition of network after restart, lead that job can not restoring !image-2023-06-13-07-14-48-958.png! Environment: centos 7. jdk 8. flink1.17.1 application mode on yarn > flink can't the partition of network after restart > -- > > Key: FLINK-32319 > URL: https://issues.apache.org/jira/browse/FLINK-32319 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.17.1 > Environment: centos 7. > jdk 8. > flink1.17.1 application mode on yarn >Reporter: wgcn >Priority: Major > Attachments: image-2023-06-13-07-14-48-958.png > > > flink can't the partition of network after restart, lead that job can not > restoring > !image-2023-06-13-07-14-48-958.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32319) flink can't the partition of network after restart
wgcn created FLINK-32319: Summary: flink can't the partition of network after restart Key: FLINK-32319 URL: https://issues.apache.org/jira/browse/FLINK-32319 Project: Flink Issue Type: Bug Reporter: wgcn -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] architgyl commented on a diff in pull request #22509: [FLINK-31983] Add yarn Acls capability to Flink containers
architgyl commented on code in PR #22509: URL: https://github.com/apache/flink/pull/22509#discussion_r1227304307 ## flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java: ## @@ -60,6 +60,12 @@ class YARNSessionFIFOITCase extends YarnTestBase { private static final Logger log = LoggerFactory.getLogger(YARNSessionFIFOITCase.class); +protected static final String VIEW_ACLS = "user group"; +protected static final String MODIFY_ACLS = "admin groupAdmin"; +protected static final String VIEW_ACLS_WITH_WILDCARD = "user,* group"; +protected static final String MODIFY_ACLS_WITH_WILDCARD = "admin,* groupAdmin"; Review Comment: @becketqin addressed the comments. * makes sense rather than `user1,user2,*`. Can you please review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-training] ness-marichamyramachandran opened a new pull request, #62: FLK-1036 - Flink Level 1 - Module 1 Lab Exercises
ness-marichamyramachandran opened a new pull request, #62: URL: https://github.com/apache/flink-training/pull/62 RideCleansingExercise - Changes made to filter the rides only in New York City area. RidesAndFaresExercise - Below Changes have been made: 1. Added NYCFilter to filter the rides only from New York City area. 2. Implemented EnrichmentFunction to maintain the state of the rides to get its corresponding fare. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-training] ness-marichamyramachandran closed pull request #62: FLK-1036 - Flink Level 1 - Module 1 Lab Exercises
ness-marichamyramachandran closed pull request #62: FLK-1036 - Flink Level 1 - Module 1 Lab Exercises URL: https://github.com/apache/flink-training/pull/62 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31990) Use Flink Configuration to specify KDS Source configuration object
[ https://issues.apache.org/jira/browse/FLINK-31990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31990: --- Labels: pull-request-available (was: ) > Use Flink Configuration to specify KDS Source configuration object > -- > > Key: FLINK-31990 > URL: https://issues.apache.org/jira/browse/FLINK-31990 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Priority: Major > Labels: pull-request-available > > *What* > Use the Flink Configuration object to standardise the method of specifying > configurations for the KDS source. > > Also include validations: > - Check that region in config matches ARN. ARN should take priority. > > *Why* > We want to standardise error messages + source serialization methods > implemented by Flink on the Flink Configuration objects. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] hlteoh37 opened a new pull request, #77: [FLINK-31990][Connectors/Kinesis] Use Configuration object instead of…
hlteoh37 opened a new pull request, #77: URL: https://github.com/apache/flink-connector-aws/pull/77 … properties in KDS Source ## Purpose of the change Use Configuration object for specifying Source config instead of Properties. ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] architgyl closed pull request #22766: Yarn acl
architgyl closed pull request #22766: Yarn acl URL: https://github.com/apache/flink/pull/22766 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] architgyl opened a new pull request, #22766: Yarn acl
architgyl opened a new pull request, #22766: URL: https://github.com/apache/flink/pull/22766 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] architgyl closed pull request #22765: Yarn acl
architgyl closed pull request #22765: Yarn acl URL: https://github.com/apache/flink/pull/22765 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] architgyl opened a new pull request, #22765: Yarn acl
architgyl opened a new pull request, #22765: URL: https://github.com/apache/flink/pull/22765 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pvary commented on a diff in pull request #22694: [FLINK-32223][runtime][security] Add Hive delegation token support
pvary commented on code in PR #22694: URL: https://github.com/apache/flink/pull/22694#discussion_r1227217764 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveDelegationTokenProvider.java: ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.security.token; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.security.token.DelegationTokenProvider; +import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter; +import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider; +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Optional; + +/** Delegation token provider for Hive. */ +@Internal +public class HiveDelegationTokenProvider implements DelegationTokenProvider { + +private static final Logger LOG = LoggerFactory.getLogger(HiveDelegationTokenProvider.class); + +org.apache.hadoop.conf.Configuration hiveConf; + +private KerberosLoginProvider kerberosLoginProvider; + +private static final Text TOKEN_ALIAS = new Text("hive.server2.delegation.token"); + +@Override +public String serviceName() { +return "HiveServer2"; +} + +@Override +public void init(Configuration configuration) throws Exception { +hiveConf = getHiveConfiguration(configuration); +kerberosLoginProvider = new KerberosLoginProvider(configuration); +} + +private org.apache.hadoop.conf.Configuration getHiveConfiguration(Configuration conf) { +try { +org.apache.hadoop.conf.Configuration hadoopConf = +HadoopUtils.getHadoopConfiguration(conf); +hiveConf = new HiveConf(hadoopConf, HiveConf.class); +} catch (Exception | NoClassDefFoundError e) { +LOG.warn("Fail to create Hive Configuration", e); +} +return hiveConf; +} + +@Override +public boolean delegationTokensRequired() throws Exception { +/** + * The general rule how a provider/receiver must behave is the following: The provider and + * the receiver must be added to the classpath together with all the additionally required + * dependencies. + * + * This null check is required because the Hive provider is always on classpath but Hive + * jars are optional. Such case configuration is not able to be loaded. This construct is + * intended to be removed when Hive provider/receiver pair can be externalized (namely if a + * provider/receiver throws an exception then workload must be stopped). + */ +if (hiveConf == null) { +LOG.debug( +"Hive is not available (not packaged with this application), hence no " ++ "tokens will be acquired."); +return false; +} +try { +if (!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) { +return false; +} +} catch (IOException e) { +LOG.debug("Hadoop Kerberos is not enabled."); +return false; +} +return !hiveConf.getTrimmed("hive.metastore.uris", "").isEmpty() +&& kerberosLoginProvider.isLoginPossible(false); +} + +@Override +public ObtainedDelegationTokens obtainDelegationTokens() throws Exception { +UserGroupInformation freshUGI = kerberosLoginProvider.doLoginAndReturnUGI(); +
[GitHub] [flink-connector-jdbc] MartijnVisser merged pull request #57: [BP-3.1][FLINK-30790][Connector/JDBC] Add DatabaseExtension with TableManaged or testing.
MartijnVisser merged PR #57: URL: https://github.com/apache/flink-connector-jdbc/pull/57 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] snuyanzin opened a new pull request, #57: [BP-3.1][FLINK-30790][Connector/JDBC] Add DatabaseExtension with TableManaged or testing.
snuyanzin opened a new pull request, #57: URL: https://github.com/apache/flink-connector-jdbc/pull/57 This is a cherry-pick backport of https://github.com/apache/flink-connector-jdbc/pull/22 to 3.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc
[ https://issues.apache.org/jira/browse/FLINK-32313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-32313. -- Fix Version/s: jdbc-3.2.0 Assignee: Sergey Nuyanzin Resolution: Fixed Fixed in main: 28016fdec16621e37b0e42322b1a542ca79343f8 > CrateDB relies on flink-shaded in flink-connector-jdbc > -- > > Key: FLINK-32313 > URL: https://issues.apache.org/jira/browse/FLINK-32313 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Reporter: Martijn Visser >Assignee: Sergey Nuyanzin >Priority: Blocker > Labels: pull-request-available > Fix For: jdbc-3.2.0 > > > See > https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27 > - JDBC shouldn't rely on flink-shaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-jdbc] boring-cyborg[bot] commented on pull request #56: [FLINK-32313] Remove usage of flink-shaded from the code, add deprecation on checkstyle level
boring-cyborg[bot] commented on PR #56: URL: https://github.com/apache/flink-connector-jdbc/pull/56#issuecomment-1587890117 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] MartijnVisser merged pull request #56: [FLINK-32313] Remove usage of flink-shaded from the code, add deprecation on checkstyle level
MartijnVisser merged PR #56: URL: https://github.com/apache/flink-connector-jdbc/pull/56 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hanyuzheng7 commented on pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function
hanyuzheng7 commented on PR #22717: URL: https://github.com/apache/flink/pull/22717#issuecomment-1587886884 @snuyanzin @bvarghese1 For 2. different types and case 3.wrong(incompatible) types, if array exists null elements, I don't find a effective way to solve this problem because you can see array[array[1,2]], array[array[1, null]], their datatype are ARRAY[ARRAY NOT NULL] NOT NULL, ARRAY[ARRAY ] NOT NULL. It obviously they do not belong to same dataType, so we cannot concat them. As before, they can be concat because that I didn't check whether their dataType are same. But once you check it you will find array[array[1,2]], array[array[1, null]] belong to different dataType, so the res of concat(array[array[1,2], array[array[1, null]) should not [array[1,2], array[1, null]]. It should be null. But now, it can check the situation like array_concat(ARRAY['2'], ARRAY[1]), the res is null. Because their Datatype are different. But if you want to get this situation concat(array[array[1,2], array[array[1, null]) res: [array[1,2], array[1, null]], it become more difficult, because you want to remove all The NULL exist in the dataType. I have to use nullable way to remove all the NULL in their DataType. up to now, I can only find a way to remove outermost NULL, such as change ARRAY[ARRAY NOT NULL] NOT NULL -> ARRAY[ARRAY NOT NULL], but I cannot find a way to remove insider NULL in this version Flink. Because at the worst case if array like this -> array[array[array[array[array[array[1,2]], up to now I cannot find a way remove insider NULL. So now I have two way to solve 2. different types and case 3.wrong(incompatible). The first way is that do not allow any null exist in the any array. The second way is to strictly specify the input type, such as array[1,2]], array[array[1, null] they belong to different dataType so the res should be null. The newest version use the second way. Do you think is ok for you? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31980) Implement support for EFO in new Source
[ https://issues.apache.org/jira/browse/FLINK-31980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31980: --- Labels: pull-request-available (was: ) > Implement support for EFO in new Source > --- > > Key: FLINK-31980 > URL: https://issues.apache.org/jira/browse/FLINK-31980 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Priority: Major > Labels: pull-request-available > > Implement support for reading from Kinesis Stream using Enhanced Fan Out > mechanism -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] hlteoh37 opened a new pull request, #76: [FLINK-31980] Implement support for EFO in Kinesis consumer
hlteoh37 opened a new pull request, #76: URL: https://github.com/apache/flink-connector-aws/pull/76 ## Purpose of the change Implement support for EFO in the new KDS source ## Verifying this change This change added tests and can be verified as follows: - Added unit tests - Manually verified by running the Kinesis connector on a local Flink cluster. ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31813) Initial implementation of Kinesis Source using FLIP-27
[ https://issues.apache.org/jira/browse/FLINK-31813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731720#comment-17731720 ] Danny Cranmer commented on FLINK-31813: --- Merged commit [{{43e1295}}|https://github.com/apache/flink-connector-aws/commit/43e1295e402ed59e6db3689f6bd16b69e6f4e12e] into apache:main > Initial implementation of Kinesis Source using FLIP-27 > -- > > Key: FLINK-31813 > URL: https://issues.apache.org/jira/browse/FLINK-31813 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Major > Fix For: aws-connector-4.2.0 > > > Implement a base implementation of the Kinesis source based on FLIP-27 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31813) Initial implementation of Kinesis Source using FLIP-27
[ https://issues.apache.org/jira/browse/FLINK-31813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer reassigned FLINK-31813: - Assignee: Hong Liang Teoh > Initial implementation of Kinesis Source using FLIP-27 > -- > > Key: FLINK-31813 > URL: https://issues.apache.org/jira/browse/FLINK-31813 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Major > Fix For: aws-connector-4.2.0 > > > Implement a base implementation of the Kinesis source based on FLIP-27 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31813) Initial implementation of Kinesis Source using FLIP-27
[ https://issues.apache.org/jira/browse/FLINK-31813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danny Cranmer resolved FLINK-31813. --- Resolution: Done > Initial implementation of Kinesis Source using FLIP-27 > -- > > Key: FLINK-31813 > URL: https://issues.apache.org/jira/browse/FLINK-31813 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Major > Fix For: aws-connector-4.2.0 > > > Implement a base implementation of the Kinesis source based on FLIP-27 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-aws] dannycranmer merged pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
dannycranmer merged PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #49: [FLINK-24438] Add Kinesis connector using FLIP-27
dannycranmer commented on code in PR #49: URL: https://github.com/apache/flink-connector-aws/pull/49#discussion_r1227015927 ## flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; +import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; +import org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner; +import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema; + +import java.util.Properties; + +/** + * Builder to construct the {@link KinesisStreamsSource}. + * + * The following example shows the minimum setup to create a {@link KinesisStreamsSource} that + * reads String values from a Kinesis Data Streams stream with ARN of + * arn:aws:kinesis:us-east-1:012345678901:stream/your_stream_name. + * + * {@code + * KinesisStreamsSource kdsSource = + * KinesisStreamsSource.builder() + * .setStreamArn("arn:aws:kinesis:us-east-1:012345678901:stream/your_stream_name") + * .setDeserializationSchema(new SimpleStringSchema()) Review Comment: I think properties (`consumerConfig`) are required too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22743: [FLINK-26515][test] Add exception handling for RetryingExecutorTest#testDiscardOnTimeout
reswqa commented on code in PR #22743: URL: https://github.com/apache/flink/pull/22743#discussion_r1227024446 ## flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java: ## @@ -118,6 +123,7 @@ public void handleFailure(Throwable throwable) {} Thread.sleep(10); } } +assertThat(unexpectedException.get()).isNull(); Review Comment: nit: ```suggestion assertThat(unexpectedException).hasValue(null); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins
[ https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luís Costa updated FLINK-32318: --- Description: Greetings, I'm trying to configure [Flink's Kubernetes HA services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/] for flink operator jobs, but got an error regarding s3 plugin: _"Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto"_ {code:java} 2023-06-12 10:05:16,981 INFO akka.remote.Remoting [] - Starting remoting 2023-06-12 10:05:17,194 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink@10.4.125.209:6123] 2023-06-12 10:05:17,377 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system started at akka.tcp://flink@10.4.125.209:6123 2023-06-12 10:05:18,175 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory. at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102) at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86) at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99) ... 13 more {code} Looking into the job container, can see that s3 plugins are in folder _/opt/flink/opt_ instead of {_}/opt/flink/plugins/s3{_}, as mentioned [here|https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/] {code:java} root@flink-basic-example-xpto1-86bb9b9d44-hksq8:/opt/flink# cd plugins/
[jira] [Closed] (FLINK-32213) Add get off heap buffer in memory segment
[ https://issues.apache.org/jira/browse/FLINK-32213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo closed FLINK-32213. -- Fix Version/s: 1.18.0 Resolution: Done master(1.18) via 81cab8f486997ef666128cce4903c24d44ac7534. > Add get off heap buffer in memory segment > - > > Key: FLINK-32213 > URL: https://issues.apache.org/jira/browse/FLINK-32213 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > When flink job writes data to data lake such as paimon, iceberg and hudi, the > sink will write data to writer buffer first, then flush the data to file > system. To manage the writer buffer better, we'd like to allocate segment > from managed memory in flink and get off heap buffer to create writer buffer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins
[ https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luís Costa updated FLINK-32318: --- Description: Greetings, I'm trying to configure [Flink's Kubernetes HA services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/] for flink operator jobs, but got an error regarding s3 plugin: _"Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto"_ {code:java} 2023-06-12 10:05:16,981 INFO akka.remote.Remoting [] - Starting remoting 2023-06-12 10:05:17,194 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink@10.4.125.209:6123] 2023-06-12 10:05:17,377 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system started at akka.tcp://flink@10.4.125.209:6123 2023-06-12 10:05:18,175 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory. at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102) at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86) at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99) ... 13 more {code} Looking into the job container, can see that s3 plugins are in folder _/opt/flink/opt_ instead of {_}/opt/flink/plugins/s3{_}, as mentioned [here|https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/] {code:java} root@flink-basic-example-xpto1-86bb9b9d44-hksq8:/opt/flink# cd plugins/
[jira] [Updated] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins
[ https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luís Costa updated FLINK-32318: --- Description: Greetings, I'm trying to configure [Flink's Kubernetes HA services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/] for flink operator jobs, but got an error regarding s3 plugin: _"Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto"_ {code:java} 2023-06-12 10:05:16,981 INFO akka.remote.Remoting [] - Starting remoting 2023-06-12 10:05:17,194 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink@10.4.125.209:6123] 2023-06-12 10:05:17,377 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system started at akka.tcp://flink@10.4.125.209:6123 2023-06-12 10:05:18,175 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory. at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102) at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86) at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99) ... 13 more {code} Looking into the job container, can see that s3 plugins are in folder _/opt/flink/opt_ instead of {_}/opt/flink/plugins/s3{_}, as mentioned [here|https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/] {code:java} root@flink-basic-example-xpto1-86bb9b9d44-hksq8:/opt/flink# cd plugins/
[jira] [Updated] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins
[ https://issues.apache.org/jira/browse/FLINK-32318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luís Costa updated FLINK-32318: --- Description: Greetings, I'm trying to configure [Flink's Kubernetes HA services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/] for flink operator jobs, but got an error regarding s3 plugin: _"Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto"_ {code:java} 2023-06-12 10:05:16,981 INFO akka.remote.Remoting [] - Starting remoting 2023-06-12 10:05:17,194 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink@10.4.125.209:6123] 2023-06-12 10:05:17,377 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system started at akka.tcp://flink@10.4.125.209:6123 2023-06-12 10:05:18,175 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory. at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102) at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86) at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99) ... 13 more {code} Looking into the container, can see that s3 plugins are in folder _/opt/flink/opt_ instead of {_}/opt/flink/plugins/s3{_}, as mentioned [here|https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/] {code:java} root@flink-basic-example-xpto1-86bb9b9d44-hksq8:/opt/flink# cd plugins/
[jira] [Created] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins
Luís Costa created FLINK-32318: -- Summary: [flink-operator] missing s3 plugin in folder plugins Key: FLINK-32318 URL: https://issues.apache.org/jira/browse/FLINK-32318 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: Luís Costa Greetings, I'm trying to configure [Flink's Kubernetes HA services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/] for flink operator jobs, but got an error regarding s3 plugin: _"Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto"_ {code:java} 2023-06-12 10:05:16,981 INFO akka.remote.Remoting [] - Starting remoting 2023-06-12 10:05:17,194 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink@10.4.125.209:6123] 2023-06-12 10:05:17,377 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system started at akka.tcp://flink@10.4.125.209:6123 2023-06-12 10:05:18,175 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory. at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) Caused by: java.io.IOException: Could not create FileSystem for highly available storage path (s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102) at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86) at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296) ... 10 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99) ... 13 more {code} Looking into the container, can see that s3 plugins are in folder /opt/flink/ instead of s3/plugins as mentioned
[jira] [Updated] (FLINK-32317) Enrich metadata in CR error field
[ https://issues.apache.org/jira/browse/FLINK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32317: --- Labels: pull-request-available (was: ) > Enrich metadata in CR error field > - > > Key: FLINK-32317 > URL: https://issues.apache.org/jira/browse/FLINK-32317 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: Daren Wong >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.6.0 > > > CR Error field is improved in > https://issues.apache.org/jira/browse/FLINK-29708. > The error field is more structured with exception type, stackTrace, > additionalMetadata, etc. > > This ticket is a proposal to expose a config > ("kubernetes.operator.exception.metadata.mapper") to enrich the > additionalMetadata further. > > The config consists of key-value pairs, for example: > {code:java} > kubernetes.operator.exception.metadata.mapper: IOException:Found > IOException,403:Found 403 error code{code} > The key is a REGEX string that will be used to match against the whole stack > trace and if found, the value will be added to additionalMetadata. For > example: > {code:java} > apiVersion: flink.apache.org/v1beta1 > kind: FlinkSessionJob > > name: basic-session-job-example > namespace: default > resourceVersion: "70206149" > uid: 916ea8f5-0821-4839-9953-2db9678c3fc9 > spec: > deploymentName: basic-session-deployment-example > job: > args: [] > jarURI: https://test-s3.s3.amazonaws.com/doubleExecute.jar > parallelism: 4 > state: running > upgradeMode: stateless > status: > error: > '{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.IOException: > Server returned HTTP response code: 403 for URL: > https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found > 403 error code","Found > IOException"]},"throwableList":[{"type":"java.io.IOException","message":"Server > returned HTTP response code: 403 for URL: > https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found > 403 error code"]}}]}' > ... > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] darenwkt opened a new pull request, #620: [FLINK-32317] Enrich metadata in CR error field
darenwkt opened a new pull request, #620: URL: https://github.com/apache/flink-kubernetes-operator/pull/620 ## What is the purpose of the change CR Error field is improved in https://issues.apache.org/jira/browse/FLINK-29708. The error field is more structured with exception type, stackTrace, additionalMetadata, etc. This ticket is a proposal to expose a config ("kubernetes.operator.exception.metadata.mapper") to enrich the additionalMetadata further. The config consists of key-value pairs, for example: ``` kubernetes.operator.exception.metadata.mapper: IOException:Found IOException,403:Found 403 error code ``` The key is a REGEX string that will be used to match against the whole stack trace and if found, the value will be added to additionalMetadata. For example: ``` apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob name: basic-session-job-example namespace: default resourceVersion: "70206149" uid: 916ea8f5-0821-4839-9953-2db9678c3fc9 spec: deploymentName: basic-session-deployment-example job: args: [] jarURI: https://test-s3.s3.amazonaws.com/doubleExecute.jar parallelism: 4 state: running upgradeMode: stateless status: error: '{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.IOException: Server returned HTTP response code: 403 for URL: https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found 403 error code","Found IOException"]},"throwableList":[{"type":"java.io.IOException","message":"Server returned HTTP response code: 403 for URL: https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found 403 error code"]}}]}' ... ``` ## Brief change log - Added new operator config for user to specify REGEX to extract info from exception and enrich CR error field with it. ## Verifying this change - Added unit test - Sanity test by building image and testing in minikube ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (no) - Core observer or reconciler logic that is regularly executed: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #617: [FLINK-32271] Report RECOMMENDED_PARALLELISM as an autoscaler metric V2
morhidi commented on code in PR #617: URL: https://github.com/apache/flink-kubernetes-operator/pull/617#discussion_r1226911859 ## flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java: ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.OperatorTestBase; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; +import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.utils.EventCollector; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import lombok.Getter; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** Test for recommended parallelism. */ +@EnableKubernetesMockClient(crud = true) +public class RecommendedParallelismTest extends OperatorTestBase { Review Comment: Thanks @mxm for your comprehensive review and feedbacks. I agree we can/should be more careful with tracking internal decisions upstream. I'll follow up on that @gyfora thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-docker] daniel-packard commented on pull request #158: [ADD-SYMLINKS] Add symlinks for FLINK_VERSION to FLINK_RELEASE jars
daniel-packard commented on PR #158: URL: https://github.com/apache/flink-docker/pull/158#issuecomment-1587654571 Hey @zentol - Right now we have two proposed changes on the table: 1. add symlinks from e.g. `1.16.jar` -> `1.16.2.jar` 2. add support for pattern matching e.g. `1.16.*.jar` I think we should try to decide between: - implement 1, - implement 2, - implement _both_, - implement _neither_ Both of the proposed changes would let the consumer of `flink:1.16` docker images ignore the detail of the patch version (e.g. `1.16.2`). So I think the first question we should answer is... Is it a reasonable request that "consumers of `flink:1.16` docker images" should be able to "ignore the detail of the patch version"? In my mind, the answer is "yes" - because if I wanted to worry about patch version, I would use a more specific docker base image (`flink:1.16.2`) But it's possible that safety/stability _demands_ that devs are aware of changing patch versions in which case the answer would be no.. But then I might ask why `flink:1.16` is even an option for a base image if we still have to worry about patch version changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hanyuzheng7 commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function
hanyuzheng7 commented on PR #22730: URL: https://github.com/apache/flink/pull/22730#issuecomment-1587632051 @dawidwys Yes concatenates is a mistake, I have already fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dawidwys commented on a diff in pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function
dawidwys commented on code in PR #22730: URL: https://github.com/apache/flink/pull/22730#discussion_r1226894432 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java: ## @@ -272,6 +281,61 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.scalar.ArrayUnionFunction") .build(); +public static final BuiltInFunctionDefinition ARRAY_MAX = +BuiltInFunctionDefinition.newBuilder() +.name("ARRAY_MAX") +.kind(SCALAR) +.inputTypeStrategy( +new InputTypeStrategy() { Review Comment: Yes, we should have that in a separate class. BTW, I don't think the current implementation checks comparability in the right way. You can take a look (and probably reuse a lot of) at https://github.com/confluentinc/flink/blob/ff7a0f6f25748933729bc94038444b3d4e233315/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java#L59 I'd try to extract as much of the logic from there and reuse it. In the end I believe we should strive to have something similar to: https://github.com/confluentinc/flink/blob/ff7a0f6f25748933729bc94038444b3d4e233315/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java#L35 where the constraint is applied to array's argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32317) Enrich metadata in CR error field
Daren Wong created FLINK-32317: -- Summary: Enrich metadata in CR error field Key: FLINK-32317 URL: https://issues.apache.org/jira/browse/FLINK-32317 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: Daren Wong Fix For: kubernetes-operator-1.6.0 CR Error field is improved in https://issues.apache.org/jira/browse/FLINK-29708. The error field is more structured with exception type, stackTrace, additionalMetadata, etc. This ticket is a proposal to expose a config ("kubernetes.operator.exception.metadata.mapper") to enrich the additionalMetadata further. The config consists of key-value pairs, for example: {code:java} kubernetes.operator.exception.metadata.mapper: IOException:Found IOException,403:Found 403 error code{code} The key is a REGEX string that will be used to match against the whole stack trace and if found, the value will be added to additionalMetadata. For example: {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob name: basic-session-job-example namespace: default resourceVersion: "70206149" uid: 916ea8f5-0821-4839-9953-2db9678c3fc9 spec: deploymentName: basic-session-deployment-example job: args: [] jarURI: https://test-s3.s3.amazonaws.com/doubleExecute.jar parallelism: 4 state: running upgradeMode: stateless status: error: '{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.IOException: Server returned HTTP response code: 403 for URL: https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found 403 error code","Found IOException"]},"throwableList":[{"type":"java.io.IOException","message":"Server returned HTTP response code: 403 for URL: https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found 403 error code"]}}]}' ... {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dawidwys commented on pull request #22730: [FLINK-32257]-table-Add-ARRAY_MAX-function
dawidwys commented on PR #22730: URL: https://github.com/apache/flink/pull/22730#issuecomment-1587592181 Thank you @hanyuzheng7 for the contribution. Could you please fill in the description template correctly? It helps reviewers tremendously. You have a nice overview what your changes do, but would be really helpful if you put it in the correct sections and fill in the rest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32300) Support get object for result set
[ https://issues.apache.org/jira/browse/FLINK-32300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li closed FLINK-32300. -- Fix Version/s: 1.18.0 Resolution: Fixed Fixed via https://github.com/apache/flink/commit/c7afa323582888ec90941e091f72b6ef3a599b13 (master) [~zjureel] Thanks for your PR! > Support get object for result set > - > > Key: FLINK-32300 > URL: https://issues.apache.org/jira/browse/FLINK-32300 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / JDBC >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Support get object for result set -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] libenchao closed pull request #22757: [FLINK-32300][jdbc-driver] Support get object for result set
libenchao closed pull request #22757: [FLINK-32300][jdbc-driver] Support get object for result set URL: https://github.com/apache/flink/pull/22757 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock
[ https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731623#comment-17731623 ] Matthias Pohl edited comment on FLINK-32311 at 6/12/23 2:10 PM: This can, indeed, happen in the new implementation (even with the {{MultipleComponentLeaderElectionDriver}} implementation which is not used in the test run right now) because we call close on the driver within the lock. The old {{DefaultLeaderElectionService}} implementation didn't do that (see [DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113] in {{release-1.17}}). The {{DefaultMultipleComponentLeaderElectionService}} implementation does close the driver in a lock, though. But it doesn't rely on the lock when processing the event (e.g. in [DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]). I considered this a bug in the implementation {{MultipleComponent*}} implementation initially: The event handling processing is done in a single thread to avoid locking. But the close method can be still called from another thread. was (Author: mapohl): This can, indeed, happen in the new implementation (even with the {{MultipleComponentLeaderElectionDriver}} implementation which is not used in the test run right now) because we call close on the driver within the lock. The old {{DefaultLeaderElectionService}} implementation didn't do that (see [DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113] in {{release-1.17}}). The {{DefaultMultipleComponentLeaderElectionService}} implementation does close the driver in a lock, though. But it doesn't rely on the lock when processing the event (e.g. in [DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]). I considered this a bug in the implementation {{MultipleComponent*}} implementation initially: The event handling processing is done in a single thread to avoid locking. But the close method can be called from another thread. > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and > DefaultLeaderElectionService.onGrantLeadership fell into dead lock > - > > Key: FLINK-32311 > URL: https://issues.apache.org/jira/browse/FLINK-32311 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8] > > there are 2 threads one locked {{0xe3a8a1e8}} and waiting for > {{0xe3a89c18}} > {noformat} > 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 > "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 > tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry > [0x7f94b63e1000] > 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: > BLOCKED (on object monitor) > 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425) > 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54 - waiting to lock > <0xe3a89c18> (a java.lang.Object) > 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300) > 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153) > 2023-06-08T01:18:54.5614070Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown > Source) > 2023-06-08T01:18:54.5615014Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92) >
[jira] [Comment Edited] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock
[ https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731623#comment-17731623 ] Matthias Pohl edited comment on FLINK-32311 at 6/12/23 2:08 PM: This can, indeed, happen in the new implementation (even with the {{MultipleComponentLeaderElectionDriver}} implementation which is not used in the test run right now) because we call close on the driver within the lock. The old {{DefaultLeaderElectionService}} implementation didn't do that (see [DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113] in {{release-1.17}}). The {{DefaultMultipleComponentLeaderElectionService}} implementation does close the driver in a lock, though. But it doesn't rely on the lock when processing the event (e.g. in [DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]). I considered this a bug in the implementation {{MultipleComponent*}} implementation initially: The event handling processing is done in a single thread to avoid locking. But the close method can be called from another thread. was (Author: mapohl): This can, indeed, happen in the new implementation (even with the {{MultipleComponentLeaderElectionDriver}} implementation which is not used in the test run right now) because we call close on the driver within the lock. The old {{DefaultLeaderElectionService}} implementation didn't do that (see [DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113] in {{release-1.17}}). The {{DefaultMultipleComponentLeaderElectionService}} implementation does close the driver in a lock, though. But it doesn't rely on the lock when processing the event (e.g. in [DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]). I considered this a bug in the implementation {{MultipleComponent*}} implementation initially: The event handling processing is done in a single thread to avoid locking. But the close method can be called from another thread. But that thought might have been wrong: It should be enough to run the event triggering (rather than the event handling) and the close method in the lock. The close method shuts down the event processing entirely (which includes interrupting any outstanding event processing tasks); no events can be triggered afterwards anymore. I'm gonna go ahead and come up with a proposal here. > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and > DefaultLeaderElectionService.onGrantLeadership fell into dead lock > - > > Key: FLINK-32311 > URL: https://issues.apache.org/jira/browse/FLINK-32311 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8] > > there are 2 threads one locked {{0xe3a8a1e8}} and waiting for > {{0xe3a89c18}} > {noformat} > 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 > "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 > tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry > [0x7f94b63e1000] > 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: > BLOCKED (on object monitor) > 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425) > 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54 - waiting to lock > <0xe3a89c18> (a java.lang.Object) > 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300) > 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153) >
[jira] [Commented] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock
[ https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731623#comment-17731623 ] Matthias Pohl commented on FLINK-32311: --- This can, indeed, happen in the new implementation (even with the {{MultipleComponentLeaderElectionDriver}} implementation which is not used in the test run right now) because we call close on the driver within the lock. The old {{DefaultLeaderElectionService}} implementation didn't do that (see [DefaultLeaderElectionService:113|https://github.com/apache/flink/blob/release-1.17/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L113] in {{release-1.17}}). The {{DefaultMultipleComponentLeaderElectionService}} implementation does close the driver in a lock, though. But it doesn't rely on the lock when processing the event (e.g. in [DefaultMultipleComponentLeaderElectionService:152|https://github.com/apache/flink/blob/e3cd3b311c1c8a6a0e0cdc849d7c951ef8beea5c/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152]). I considered this a bug in the implementation {{MultipleComponent*}} implementation initially: The event handling processing is done in a single thread to avoid locking. But the close method can be called from another thread. But that thought might have been wrong: It should be enough to run the event triggering (rather than the event handling) and the close method in the lock. The close method shuts down the event processing entirely (which includes interrupting any outstanding event processing tasks); no events can be triggered afterwards anymore. I'm gonna go ahead and come up with a proposal here. > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and > DefaultLeaderElectionService.onGrantLeadership fell into dead lock > - > > Key: FLINK-32311 > URL: https://issues.apache.org/jira/browse/FLINK-32311 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8] > > there are 2 threads one locked {{0xe3a8a1e8}} and waiting for > {{0xe3a89c18}} > {noformat} > 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 > "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 > tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry > [0x7f94b63e1000] > 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: > BLOCKED (on object monitor) > 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425) > 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54 - waiting to lock > <0xe3a89c18> (a java.lang.Object) > 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300) > 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153) > 2023-06-08T01:18:54.5614070Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown > Source) > 2023-06-08T01:18:54.5615014Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92) > 2023-06-08T01:18:54.5616259Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1640/1393625763.run(Unknown > Source) > 2023-06-08T01:18:54.5617137Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1633/2012730699.execute(Unknown > Source) > 2023-06-08T01:18:54.5618047Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.forEach(MappingListenerManager.java:89) > 2023-06-08T01:18:54.5618994Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.StandardListenerManager.forEach(StandardListenerManager.java:89) > 2023-06-08T01:18:54.5620071Z Jun 08 01:18:54 at >
[GitHub] [flink] hanyuzheng7 commented on pull request #22717: [FLINK-31665] [table] Add ARRAY_CONCAT function
hanyuzheng7 commented on PR #22717: URL: https://github.com/apache/flink/pull/22717#issuecomment-1587333488 @snuyanzin for this case `SELECT array_concat(ARRAY[1], ARRAY[CAST(NULL AS INT)]); -- returns [1, 0]` other method has same problem such as array_union, I cannot solve it up to now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-32316: Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover, and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cai Liuyang updated FLINK-32316: Description: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover, and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is created it will create the DefaultExecutionGraph, this will init the first sourceCoordinator but will not start it. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling), so the first SourceCoordinator will not be fully closed. was: When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover, and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is create it will create the DefaultExecutionGraph. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling, so the first SourceCoordinator will not be fully closed). > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover, > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is created it will create the > DefaultExecutionGraph, this will init the first sourceCoordinator but will > not start it. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling), so the first > SourceCoordinator will not be fully closed. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31665) Add ARRAY_CONCAT supported in SQL & Table API
[ https://issues.apache.org/jira/browse/FLINK-31665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731607#comment-17731607 ] Mara Steiu commented on FLINK-31665: I personally think that ARRAY_CONCAT is most intuitive option and the fact that other vendors like BQ use it also helps. > Add ARRAY_CONCAT supported in SQL & Table API > - > > Key: FLINK-31665 > URL: https://issues.apache.org/jira/browse/FLINK-31665 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: jackylau >Assignee: Hanyu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > This is an implementation of ARRAY_CONCEPT > The array_concat() function concatenates at least one array, creating an > array that contains all the elements in the first array followed by all the > elements in the second array ... followed by all the elements in the n array. > h3. Brief change log > ARRAY_CONCAT for Table API and SQL > Syntax: > {code:java} > ARRAY_CONCAT(arr1, tail...){code} > Arguments: > array: at least one ARRAY to be handled. > Returns: > The function returns NULL if any input argument is NULL. > Examples: > {code:java} > Flink SQL> SELECT array_concat(array[1, 2, 3, null, 3], array[3], array[5]); > [1, 2, 3, null, 3, 3]{code} > see also: > Google Cloud BigQuery: > [https://cloud.google.com/bigquery/docs/reference/standard-sql/array_functions#array_concat] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
[ https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731608#comment-17731608 ] Cai Liuyang commented on FLINK-32316: - [~pnowojski] please take a look? thks~ > Duplicated announceCombinedWatermark task maybe scheduled if jobmanager > failover > > > Key: FLINK-32316 > URL: https://issues.apache.org/jira/browse/FLINK-32316 > Project: Flink > Issue Type: Bug >Affects Versions: 1.16.0 >Reporter: Cai Liuyang >Priority: Major > > When we try SourceAlignment feature, we found there will be a duplicated > announceCombinedWatermark task will be scheduled after JobManager failover, > and auto recover job from checkpoint. > The reason i think is we should schedule announceCombinedWatermark task > during SourceCoordinator::start function not in SourceCoordinator construct > function (see > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] > ), because when jobManager encounter failover and auto recover job, it will > create SourceCoordinator twice: > * The first one is when JobMaster is create it will create the > DefaultExecutionGraph. > * The Second one is JobMaster call restoreLatestCheckpointedStateInternal > method, which will be reset old sourceCoordinator and initialize a new one, > but because the first sourceCoordinator is not started(SourceCoordinator will > be started before SchedulerBase::startScheduling, so the first > SourceCoordinator will not be fully closed). > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32222) Cassandra Source uses DataInputDeserializer and DataOutputSerializer non public apis
[ https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Etienne Chauchot resolved FLINK-3. -- Fix Version/s: cassandra-3.2.0 Resolution: Fixed merged master: 4f1cdfa > Cassandra Source uses DataInputDeserializer and DataOutputSerializer non > public apis > > > Key: FLINK-3 > URL: https://issues.apache.org/jira/browse/FLINK-3 > Project: Flink > Issue Type: Technical Debt >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Major > Labels: pull-request-available > Fix For: cassandra-3.2.0 > > > in class _CassandraSplitSerializer,_ these non public APIs usage __ violate > _ConnectorRules#CONNECTOR_CLASSES_ONLY_DEPEND_ON_PUBLIC_API_ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-31806) Prod architecture tests didn't detect non-public API usage
[ https://issues.apache.org/jira/browse/FLINK-31806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Etienne Chauchot resolved FLINK-31806. -- Fix Version/s: 1.18.0 Resolution: Fixed merged: 28abe81 > Prod architecture tests didn't detect non-public API usage > -- > > Key: FLINK-31806 > URL: https://issues.apache.org/jira/browse/FLINK-31806 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Cassandra, Tests >Affects Versions: cassandra-3.0.0, 1.18.0 >Reporter: Chesnay Schepler >Assignee: Etienne Chauchot >Priority: Critical > Labels: pull-request-available > Fix For: 1.18.0 > > > FLINK-31805 wasn't detected by the production architecture tests. > Not sure if this is an issue on the cassandra or Flink side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover
Cai Liuyang created FLINK-32316: --- Summary: Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover Key: FLINK-32316 URL: https://issues.apache.org/jira/browse/FLINK-32316 Project: Flink Issue Type: Bug Affects Versions: 1.16.0 Reporter: Cai Liuyang When we try SourceAlignment feature, we found there will be a duplicated announceCombinedWatermark task will be scheduled after JobManager failover, and auto recover job from checkpoint. The reason i think is we should schedule announceCombinedWatermark task during SourceCoordinator::start function not in SourceCoordinator construct function (see [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149] ), because when jobManager encounter failover and auto recover job, it will create SourceCoordinator twice: * The first one is when JobMaster is create it will create the DefaultExecutionGraph. * The Second one is JobMaster call restoreLatestCheckpointedStateInternal method, which will be reset old sourceCoordinator and initialize a new one, but because the first sourceCoordinator is not started(SourceCoordinator will be started before SchedulerBase::startScheduling, so the first SourceCoordinator will not be fully closed). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32315) Support local file upload in K8s mode
Paul Lin created FLINK-32315: Summary: Support local file upload in K8s mode Key: FLINK-32315 URL: https://issues.apache.org/jira/browse/FLINK-32315 Project: Flink Issue Type: New Feature Components: Client / Job Submission, Deployment / Kubernetes Reporter: Paul Lin Currently, Flink assumes all resources are locally accessible in the pods, which requires users to prepare the resources by mounting storages, downloading resources with init containers, or rebuilding images for each execution. We could make things much easier by introducing a built-in file distribution mechanism based on Flink-supported filesystems. It's implemented in two steps: 1. KubernetesClusterDescripter uploads all local resources to remote storage via Flink filesystem (skips if the resources are already remote). 2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner download the resources and put them in the classpath during startup. The 2nd step is mostly done by [FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this issue is focused on the upload part. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22764: [FLINK-32314][rpc] Ignore classloading errors after actorsystem shutdown
flinkbot commented on PR #22764: URL: https://github.com/apache/flink/pull/22764#issuecomment-1587277700 ## CI report: * 95de63ef5c02258d5f8fff9654daada0b49b2652 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol opened a new pull request, #22764: [FLINK-32314][rpc] Ignore classloading errors after actorsystem shutdown
zentol opened a new pull request, #22764: URL: https://github.com/apache/flink/pull/22764 Stray threads in akka/netty can run into classloading errors because they aren't shut down (for some reason) despite the actor system having shut down. Ignore errors in this specific situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32314) Ignore class-loading errors after RPC system shutdown
[ https://issues.apache.org/jira/browse/FLINK-32314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32314: --- Labels: pull-request-available (was: ) > Ignore class-loading errors after RPC system shutdown > - > > Key: FLINK-32314 > URL: https://issues.apache.org/jira/browse/FLINK-32314 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Tests >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > In tests we occasionally see the akka rpc service throwing class loading > errors _after_ it was shut down. > AFAICT our shutdown procedure is correct, and it's just akka shutting down > some things asynchronously. > I couldn't figure out why/what is still running, so as a bandaid I suggest to > ignore classloading errors after the rpc service shutdown has completed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32314) Ignore class-loading errors after RPC system shutdown
Chesnay Schepler created FLINK-32314: Summary: Ignore class-loading errors after RPC system shutdown Key: FLINK-32314 URL: https://issues.apache.org/jira/browse/FLINK-32314 Project: Flink Issue Type: Improvement Components: Runtime / RPC, Tests Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 In tests we occasionally see the akka rpc service throwing class loading errors _after_ it was shut down. AFAICT our shutdown procedure is correct, and it's just akka shutting down some things asynchronously. I couldn't figure out why/what is still running, so as a bandaid I suggest to ignore classloading errors after the rpc service shutdown has completed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-jdbc] snuyanzin opened a new pull request, #56: [FLINK-32313] Remove usage of flink-shaded from the code, add deprecation on checkstyle level
snuyanzin opened a new pull request, #56: URL: https://github.com/apache/flink-connector-jdbc/pull/56 After that any code with import of `flink-shaded` should fail at compilation on checkstyle task -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc
[ https://issues.apache.org/jira/browse/FLINK-32313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32313: --- Labels: pull-request-available (was: ) > CrateDB relies on flink-shaded in flink-connector-jdbc > -- > > Key: FLINK-32313 > URL: https://issues.apache.org/jira/browse/FLINK-32313 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Reporter: Martijn Visser >Priority: Blocker > Labels: pull-request-available > > See > https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27 > - JDBC shouldn't rely on flink-shaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32308) RestClusterClient submit job to remote cluster
[ https://issues.apache.org/jira/browse/FLINK-32308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser closed FLINK-32308. -- Resolution: Invalid [~SpongebobZ] This request for help should be asked via the User mailing list, Stackoverflow or Slack. Jira is for bugs or new feature requests > RestClusterClient submit job to remote cluster > -- > > Key: FLINK-32308 > URL: https://issues.apache.org/jira/browse/FLINK-32308 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Not a Priority > > I just used `RestClusterClient` submit job to remote cluster, but out of my > expectation it submitted to local cluster instead. Could you help with me > {code:java} > String host = "x.x.x.x"; > int port = 8081; > Configuration flinkConfiguration = new Configuration(); > flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); > flinkConfiguration.setInteger(JobManagerOptions.PORT, 6123); > flinkConfiguration.setInteger(RestOptions.PORT, port); > RestClusterClient clusterClient = new > RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance()); > String s = clusterClient.getWebInterfaceURL(); > List extraJars = new ArrayList<>(); > extraJars.add(new File("C:\\Users\\extend.jar").toURI().toURL()); > PackagedProgram packagedProgram = PackagedProgram.newBuilder() > .setConfiguration(flinkConfiguration) > .setJarFile(new File("F:\\data.jar")) > .setEntryPointClassName("MyApplication") > .setUserClassPaths(extraJars) > .build(); > JobID jobID = JobID.generate(); > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, > flinkConfiguration, 2, jobID, false); > clusterClient.submitJob(jobGraph); > System.out.println(jobID); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock
[ https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-32311: - Assignee: Matthias Pohl > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and > DefaultLeaderElectionService.onGrantLeadership fell into dead lock > - > > Key: FLINK-32311 > URL: https://issues.apache.org/jira/browse/FLINK-32311 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Matthias Pohl >Priority: Critical > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8] > > there are 2 threads one locked {{0xe3a8a1e8}} and waiting for > {{0xe3a89c18}} > {noformat} > 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 > "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 > tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry > [0x7f94b63e1000] > 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: > BLOCKED (on object monitor) > 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425) > 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54 - waiting to lock > <0xe3a89c18> (a java.lang.Object) > 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300) > 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153) > 2023-06-08T01:18:54.5614070Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown > Source) > 2023-06-08T01:18:54.5615014Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92) > 2023-06-08T01:18:54.5616259Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1640/1393625763.run(Unknown > Source) > 2023-06-08T01:18:54.5617137Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1633/2012730699.execute(Unknown > Source) > 2023-06-08T01:18:54.5618047Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.forEach(MappingListenerManager.java:89) > 2023-06-08T01:18:54.5618994Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.StandardListenerManager.forEach(StandardListenerManager.java:89) > 2023-06-08T01:18:54.5620071Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:711) > 2023-06-08T01:18:54.5621198Z Jun 08 01:18:54 - locked <0xe3a8a1e8> > (a > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch) > 2023-06-08T01:18:54.5622072Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.checkLeadership(LeaderLatch.java:597) > 2023-06-08T01:18:54.5622991Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.access$600(LeaderLatch.java:64) > 2023-06-08T01:18:54.5623988Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$7.processResult(LeaderLatch.java:648) > 2023-06-08T01:18:54.5624965Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926) > 2023-06-08T01:18:54.5626218Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683) > 2023-06-08T01:18:54.5627369Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152) > 2023-06-08T01:18:54.5628353Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetChildrenBuilderImpl$2.processResult(GetChildrenBuilderImpl.java:187) >
[jira] [Created] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc
Martijn Visser created FLINK-32313: -- Summary: CrateDB relies on flink-shaded in flink-connector-jdbc Key: FLINK-32313 URL: https://issues.apache.org/jira/browse/FLINK-32313 Project: Flink Issue Type: Bug Components: Connectors / JDBC Reporter: Martijn Visser See https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27 - JDBC shouldn't rely on flink-shaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc
[ https://issues.apache.org/jira/browse/FLINK-32313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731579#comment-17731579 ] Martijn Visser commented on FLINK-32313: [~matriv] This is a blocker for JDBC, can you make sure that no Flink-Shaded is used by CrateDB? > CrateDB relies on flink-shaded in flink-connector-jdbc > -- > > Key: FLINK-32313 > URL: https://issues.apache.org/jira/browse/FLINK-32313 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Reporter: Martijn Visser >Priority: Blocker > > See > https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27 > - JDBC shouldn't rely on flink-shaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32312) SSLConnectionSocketFactory produced no output for 900 seconds
[ https://issues.apache.org/jira/browse/FLINK-32312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731578#comment-17731578 ] Sergey Nuyanzin commented on FLINK-32312: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49645=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=15678 > SSLConnectionSocketFactory produced no output for 900 seconds > - > > Key: FLINK-32312 > URL: https://issues.apache.org/jira/browse/FLINK-32312 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49688=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=15175 > {noformat} > "main" #1 prio=5 os_prio=0 tid=0x7fb46c00b800 nid=0x184 runnable > [0x7fb473251000] > Jun 06 09:53:49java.lang.Thread.State: RUNNABLE > Jun 06 09:53:49 at java.net.PlainSocketImpl.socketConnect(Native Method) > Jun 06 09:53:49 at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > Jun 06 09:53:49 at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > Jun 06 09:53:49 at > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > Jun 06 09:53:49 at > java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > Jun 06 09:53:49 at java.net.Socket.connect(Socket.java:607) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.execute(MainClientExec.java:236) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RetryExec.execute(RetryExec.java:89) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RedirectExec.execute(RedirectExec.java:110) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.httpclient.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.execute(AbstractHttpClientWagon.java:1005) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1162) > Jun 06 09:53:49 at > org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1140) > Jun 06 09:53:49 at > org.apache.maven.wagon.StreamWagon.getInputStream(StreamWagon.java:126) > Jun 06 09:53:49 at > org.apache.maven.wagon.StreamWagon.getIfNewer(StreamWagon.java:88) > ... > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32312) SSLConnectionSocketFactory produced no output for 900 seconds
Sergey Nuyanzin created FLINK-32312: --- Summary: SSLConnectionSocketFactory produced no output for 900 seconds Key: FLINK-32312 URL: https://issues.apache.org/jira/browse/FLINK-32312 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 1.18.0 Reporter: Sergey Nuyanzin https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49688=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=15175 {noformat} "main" #1 prio=5 os_prio=0 tid=0x7fb46c00b800 nid=0x184 runnable [0x7fb473251000] Jun 06 09:53:49java.lang.Thread.State: RUNNABLE Jun 06 09:53:49 at java.net.PlainSocketImpl.socketConnect(Native Method) Jun 06 09:53:49 at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) Jun 06 09:53:49 at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) Jun 06 09:53:49 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) Jun 06 09:53:49 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) Jun 06 09:53:49 at java.net.Socket.connect(Socket.java:607) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.execute(MainClientExec.java:236) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RetryExec.execute(RetryExec.java:89) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RedirectExec.execute(RedirectExec.java:110) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.httpclient.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.execute(AbstractHttpClientWagon.java:1005) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1162) Jun 06 09:53:49 at org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1140) Jun 06 09:53:49 at org.apache.maven.wagon.StreamWagon.getInputStream(StreamWagon.java:126) Jun 06 09:53:49 at org.apache.maven.wagon.StreamWagon.getIfNewer(StreamWagon.java:88) ... {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32299) Upload python jar when sql contains python udf jar
[ https://issues.apache.org/jira/browse/FLINK-32299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang reassigned FLINK-32299: - Assignee: FangYong > Upload python jar when sql contains python udf jar > -- > > Key: FLINK-32299 > URL: https://issues.apache.org/jira/browse/FLINK-32299 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway, Table SQL / Runtime >Reporter: Shengkai Fang >Assignee: FangYong >Priority: Major > > Currently, sql gateway always uploads the python jar when submitting jobs. > However, it's not required for every sql job. We should add the python jar > into the PipelineOpitons.JARS only when user jobs contain python udf. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32309) Shared classpaths and jars manager for jobs in sql gateway cause confliction
[ https://issues.apache.org/jira/browse/FLINK-32309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang reassigned FLINK-32309: - Assignee: FangYong > Shared classpaths and jars manager for jobs in sql gateway cause confliction > > > Key: FLINK-32309 > URL: https://issues.apache.org/jira/browse/FLINK-32309 > Project: Flink > Issue Type: Bug > Components: Table SQL / Gateway >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: FangYong >Priority: Major > > Current all jobs in the same session of sql gateway will share the resource > manager which provide the classpath for jobs. After a job is performed, it's > classpath and jars will be in the shared resource manager which are used by > the next jobs. It may cause too many unnecessary jars in a job or even cause > confliction -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27284) KafkaSinkITCase$IntegrationTests.testScaleUp failed on azures due to failed to create topic
[ https://issues.apache.org/jira/browse/FLINK-27284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731575#comment-17731575 ] Sergey Nuyanzin commented on FLINK-27284: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49702=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203=36363 > KafkaSinkITCase$IntegrationTests.testScaleUp failed on azures due to failed > to create topic > --- > > Key: FLINK-27284 > URL: https://issues.apache.org/jira/browse/FLINK-27284 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: 1.15.0, 1.16.0, 1.17.2 >Reporter: Yun Gao >Assignee: Qingsheng Ren >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > {code:java} > 2022-04-17T06:38:39.4884418Z Apr 17 06:38:39 [ERROR] Tests run: 10, Failures: > 0, Errors: 1, Skipped: 0, Time elapsed: 97.71 s <<< FAILURE! - in > org.apache.flink.connector.kafka.sink.KafkaSinkITCase$IntegrationTests > 2022-04-17T06:38:39.4885911Z Apr 17 06:38:39 [ERROR] > org.apache.flink.connector.kafka.sink.KafkaSinkITCase$IntegrationTests.testScaleUp(TestEnvironment, > DataStreamSinkExternalContext, CheckpointingMode)[2] Time elapsed: 30.115 s > <<< ERROR! > 2022-04-17T06:38:39.4887050Z Apr 17 06:38:39 java.lang.RuntimeException: > Cannot create topic 'kafka-single-topic-4486440447887382037' > 2022-04-17T06:38:39.4889332Z Apr 17 06:38:39 at > org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContext.createTopic(KafkaSinkExternalContext.java:108) > 2022-04-17T06:38:39.4891038Z Apr 17 06:38:39 at > org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContext.createSink(KafkaSinkExternalContext.java:136) > 2022-04-17T06:38:39.4892936Z Apr 17 06:38:39 at > org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.tryCreateSink(SinkTestSuiteBase.java:567) > 2022-04-17T06:38:39.4894388Z Apr 17 06:38:39 at > org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.restartFromSavepoint(SinkTestSuiteBase.java:258) > 2022-04-17T06:38:39.4895903Z Apr 17 06:38:39 at > org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.testScaleUp(SinkTestSuiteBase.java:201) > 2022-04-17T06:38:39.4897144Z Apr 17 06:38:39 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2022-04-17T06:38:39.4898432Z Apr 17 06:38:39 at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2022-04-17T06:38:39.4899803Z Apr 17 06:38:39 at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2022-04-17T06:38:39.4900985Z Apr 17 06:38:39 at > java.base/java.lang.reflect.Method.invoke(Method.java:566) > 2022-04-17T06:38:39.4902266Z Apr 17 06:38:39 at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > 2022-04-17T06:38:39.4903521Z Apr 17 06:38:39 at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > 2022-04-17T06:38:39.4904835Z Apr 17 06:38:39 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > 2022-04-17T06:38:39.4906422Z Apr 17 06:38:39 at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > 2022-04-17T06:38:39.4907505Z Apr 17 06:38:39 at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > 2022-04-17T06:38:39.4908355Z Apr 17 06:38:39 at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92) > 2022-04-17T06:38:39.4909242Z Apr 17 06:38:39 at > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > 2022-04-17T06:38:39.4910144Z Apr 17 06:38:39 at > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > 2022-04-17T06:38:39.4911103Z Apr 17 06:38:39 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > 2022-04-17T06:38:39.4912013Z Apr 17 06:38:39 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > 2022-04-17T06:38:39.4913109Z Apr 17 06:38:39 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > 2022-04-17T06:38:39.4913983Z Apr 17 06:38:39 at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) >
[jira] [Commented] (FLINK-32036) TableEnvironmentTest.test_explain is unstable on azure ci
[ https://issues.apache.org/jira/browse/FLINK-32036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731573#comment-17731573 ] Sergey Nuyanzin commented on FLINK-32036: - https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49702=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=24547 > TableEnvironmentTest.test_explain is unstable on azure ci > - > > Key: FLINK-32036 > URL: https://issues.apache.org/jira/browse/FLINK-32036 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.17.1 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > > it's failed on ci (1.17 branch so far) > {noformat} > May 07 01:51:35 === FAILURES > === > May 07 01:51:35 __ TableEnvironmentTest.test_explain > ___ > May 07 01:51:35 > May 07 01:51:35 self = > testMethod=test_explain> > May 07 01:51:35 > May 07 01:51:35 def test_explain(self): > May 07 01:51:35 schema = RowType() \ > May 07 01:51:35 .add('a', DataTypes.INT()) \ > May 07 01:51:35 .add('b', DataTypes.STRING()) \ > May 07 01:51:35 .add('c', DataTypes.STRING()) > May 07 01:51:35 t_env = self.t_env > May 07 01:51:35 t = t_env.from_elements([], schema) > May 07 01:51:35 result = t.select(t.a + 1, t.b, t.c) > May 07 01:51:35 > May 07 01:51:35 > actual = result.explain() > May 07 01:51:35 > May 07 01:51:35 pyflink/table/tests/test_table_environment_api.py:66 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48766=logs=bf5e383b-9fd3-5f02-ca1c-8f788e2e76d3=85189c57-d8a0-5c9c-b61d-fc05cfac62cf=25029 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-32049: Affects Version/s: 1.18.0 > CoordinatedSourceRescaleITCase.testDownscaling fails on AZP > --- > > Key: FLINK-32049 > URL: https://issues.apache.org/jira/browse/FLINK-32049 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.18.0, 1.17.1 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > > CoordinatedSourceRescaleITCase.testDownscaling fails with > {noformat} > May 08 03:19:14 [ERROR] Failures: > May 08 03:19:14 [ERROR] > CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 > May 08 03:19:14 Multiple Failures (1 failure) > May 08 03:19:14 -- failure 1 -- > May 08 03:19:14 [Any cause contains message 'successfully restored > checkpoint'] > May 08 03:19:14 Expecting any element of: > May 08 03:19:14 [org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is > suppressed by NoRestartBackoffTimeStrategy > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > May 08 03:19:14 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > May 08 03:19:14 ...(35 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 java.lang.IllegalStateException: This executor has been > registered. > May 08 03:19:14 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63) > May 08 03:19:14 ...(17 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed)] > May 08 03:19:14 to satisfy the given assertions requirements but none did: > May 08 03:19:14 > May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed) > May 08 03:19:14 error: > May 08 03:19:14 Expecting throwable message: > May 08 03:19:14 "Job execution failed." > May 08 03:19:14 to contain: > May 08 03:19:14 "successfully restored checkpoint" > May 08 03:19:14 but did not. > May 08 03:19:14 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772=logs=fc7981dc-d266-55b0-5fff-f0d0a2294e36=1a9b228a-3e0e-598f-fc81-c321539dfdbf=7191 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32049) CoordinatedSourceRescaleITCase.testDownscaling fails on AZP
[ https://issues.apache.org/jira/browse/FLINK-32049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731571#comment-17731571 ] Sergey Nuyanzin commented on FLINK-32049: - Same for testUpscaling https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=fc7981dc-d266-55b0-5fff-f0d0a2294e36=1a9b228a-3e0e-598f-fc81-c321539dfdbf=7575 > CoordinatedSourceRescaleITCase.testDownscaling fails on AZP > --- > > Key: FLINK-32049 > URL: https://issues.apache.org/jira/browse/FLINK-32049 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.1 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > > CoordinatedSourceRescaleITCase.testDownscaling fails with > {noformat} > May 08 03:19:14 [ERROR] Failures: > May 08 03:19:14 [ERROR] > CoordinatedSourceRescaleITCase.testDownscaling:75->resumeCheckpoint:107 > May 08 03:19:14 Multiple Failures (1 failure) > May 08 03:19:14 -- failure 1 -- > May 08 03:19:14 [Any cause contains message 'successfully restored > checkpoint'] > May 08 03:19:14 Expecting any element of: > May 08 03:19:14 [org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 org.apache.flink.runtime.JobException: Recovery is > suppressed by NoRestartBackoffTimeStrategy > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > May 08 03:19:14 at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > May 08 03:19:14 at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > May 08 03:19:14 ...(35 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed), > May 08 03:19:14 java.lang.IllegalStateException: This executor has been > registered. > May 08 03:19:14 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.registerSubtask(ChannelStateWriteRequestExecutorImpl.java:341) > May 08 03:19:14 at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory.getOrCreateExecutor(ChannelStateWriteRequestExecutorFactory.java:63) > May 08 03:19:14 ...(17 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed)] > May 08 03:19:14 to satisfy the given assertions requirements but none did: > May 08 03:19:14 > May 08 03:19:14 org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > May 08 03:19:14 at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > May 08 03:19:14 at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > May 08 03:19:14 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > May 08 03:19:14 ...(45 remaining lines not displayed - this can be > changed with Assertions.setMaxStackTraceElementsDisplayed) > May 08 03:19:14 error: > May 08 03:19:14 Expecting throwable message: > May 08 03:19:14 "Job execution failed." > May 08 03:19:14 to contain: > May 08 03:19:14 "successfully restored checkpoint" > May 08 03:19:14 but did not. > May 08 03:19:14 > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=48772=logs=fc7981dc-d266-55b0-5fff-f0d0a2294e36=1a9b228a-3e0e-598f-fc81-c321539dfdbf=7191 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore
[ https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-28440: Affects Version/s: 1.18.0 > EventTimeWindowCheckpointingITCase failed with restore > -- > > Key: FLINK-28440 > URL: https://issues.apache.org/jira/browse/FLINK-28440 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.16.0, 1.17.0, 1.18.0 >Reporter: Huang Xingbo >Assignee: Yanfei Lei >Priority: Critical > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.18.0, 1.16.3 > > Attachments: image-2023-02-01-00-51-54-506.png, > image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, > image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, > image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, > image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png > > > {code:java} > Caused by: java.lang.Exception: Exception while creating > StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) > ... 11 more > Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: > /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced > (No such file or directory) > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92) > at > org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ... 13 more > Caused by: java.io.FileNotFoundException: > /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at
[jira] [Commented] (FLINK-28440) EventTimeWindowCheckpointingITCase failed with restore
[ https://issues.apache.org/jira/browse/FLINK-28440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731569#comment-17731569 ] Sergey Nuyanzin commented on FLINK-28440: - Redirected to here from FLINK-30107 https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba=8702 > EventTimeWindowCheckpointingITCase failed with restore > -- > > Key: FLINK-28440 > URL: https://issues.apache.org/jira/browse/FLINK-28440 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends >Affects Versions: 1.16.0, 1.17.0 >Reporter: Huang Xingbo >Assignee: Yanfei Lei >Priority: Critical > Labels: auto-deprioritized-critical, pull-request-available, > test-stability > Fix For: 1.18.0, 1.16.3 > > Attachments: image-2023-02-01-00-51-54-506.png, > image-2023-02-01-01-10-01-521.png, image-2023-02-01-01-19-12-182.png, > image-2023-02-01-16-47-23-756.png, image-2023-02-01-16-57-43-889.png, > image-2023-02-02-10-52-56-599.png, image-2023-02-03-10-09-07-586.png, > image-2023-02-03-12-03-16-155.png, image-2023-02-03-12-03-56-614.png > > > {code:java} > Caused by: java.lang.Exception: Exception while creating > StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:722) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:698) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:665) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_0a448493b4782967b150582570326227_(2/4) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) > ... 11 more > Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: > /tmp/junit1835099326935900400/junit1113650082510421526/52ee65b7-033f-4429-8ddd-adbe85e27ced > (No such file or directory) > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.advance(StateChangelogHandleStreamHandleReader.java:87) > at > org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader$1.hasNext(StateChangelogHandleStreamHandleReader.java:69) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.readBackendHandle(ChangelogBackendRestoreOperation.java:96) > at > org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation.java:75) > at > org.apache.flink.state.changelog.ChangelogStateBackend.restore(ChangelogStateBackend.java:92) > at > org.apache.flink.state.changelog.AbstractChangelogStateBackend.createKeyedStateBackend(AbstractChangelogStateBackend.java:136) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ... 13 more > Caused by:
[jira] [Updated] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock
[ https://issues.apache.org/jira/browse/FLINK-32311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-32311: Fix Version/s: (was: 1.18.0) > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and > DefaultLeaderElectionService.onGrantLeadership fell into dead lock > - > > Key: FLINK-32311 > URL: https://issues.apache.org/jira/browse/FLINK-32311 > Project: Flink > Issue Type: Bug >Reporter: Sergey Nuyanzin >Priority: Major > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8] > > there are 2 threads one locked {{0xe3a8a1e8}} and waiting for > {{0xe3a89c18}} > {noformat} > 2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 > "ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 > tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry > [0x7f94b63e1000] > 2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: > BLOCKED (on object monitor) > 2023-06-08T01:18:54.5610557Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425) > 2023-06-08T01:18:54.5611459Z Jun 08 01:18:54 - waiting to lock > <0xe3a89c18> (a java.lang.Object) > 2023-06-08T01:18:54.5612198Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300) > 2023-06-08T01:18:54.5613110Z Jun 08 01:18:54 at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153) > 2023-06-08T01:18:54.5614070Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown > Source) > 2023-06-08T01:18:54.5615014Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92) > 2023-06-08T01:18:54.5616259Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1640/1393625763.run(Unknown > Source) > 2023-06-08T01:18:54.5617137Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1633/2012730699.execute(Unknown > Source) > 2023-06-08T01:18:54.5618047Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.forEach(MappingListenerManager.java:89) > 2023-06-08T01:18:54.5618994Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.StandardListenerManager.forEach(StandardListenerManager.java:89) > 2023-06-08T01:18:54.5620071Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:711) > 2023-06-08T01:18:54.5621198Z Jun 08 01:18:54 - locked <0xe3a8a1e8> > (a > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch) > 2023-06-08T01:18:54.5622072Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.checkLeadership(LeaderLatch.java:597) > 2023-06-08T01:18:54.5622991Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.access$600(LeaderLatch.java:64) > 2023-06-08T01:18:54.5623988Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$7.processResult(LeaderLatch.java:648) > 2023-06-08T01:18:54.5624965Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926) > 2023-06-08T01:18:54.5626218Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683) > 2023-06-08T01:18:54.5627369Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152) > 2023-06-08T01:18:54.5628353Z Jun 08 01:18:54 at > org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetChildrenBuilderImpl$2.processResult(GetChildrenBuilderImpl.java:187) > 2023-06-08T01:18:54.5629281Z Jun 08 01:18:54 at >