Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1360096007 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,758 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. + + + +DataSet +DataStream + + + +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile); + + + +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile); + + + + + + +As the source of DataSet is always bounded, the execution mode must be set to RuntimeMode.BATCH to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +Sources: The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the +`DataSource`. + +Sinks: The DataStream API uses the implementations of `SinkFunction` and `Sink` to write records to external system, while the +DataSet API uses the `FileOutputFormat`. + +If you are looking for pre-defined source and sink connectors of DataStream, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}) + + +## Implement the DataSet API by DataStream + + Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +### Map + + + +DataSet +DataStream + + + +dataSet.map(new MapFunction(){ +// implement user-defined map logic +}); + + + +dataStream.map(new MapFunction(){ +// implement user-defined map logic +}); + + + + + + +### FlatMap + + + +DataSet +DataStream + + + +dataSet.flatMap(new FlatMapFunction(){ +// implement user-defined flatmap logic +}); + + + +dataStream.flatMap(new FlatMapFunction(){ +// implement user-defined flatmap logic +}); + + + + + +### Filter + + + +DataSet +DataStream + + + +dataSet.filter(new FilterFunction(){ +// implement user-defined filter logic +}); + + + +dataStream.filter(new FilterFunction(){ +// implement
[jira] [Commented] (FLINK-33185) HybridShuffleITCase fails with TimeoutException: Pending slot request timed out in slot pool on AZP
[ https://issues.apache.org/jira/browse/FLINK-33185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775540#comment-17775540 ] Yangze Guo commented on FLINK-33185: Another instance [~Weijie Guo] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53749=logs=5c8e7682-d68f-54d1-16a2-a09310218a49=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba > HybridShuffleITCase fails with TimeoutException: Pending slot request timed > out in slot pool on AZP > --- > > Key: FLINK-33185 > URL: https://issues.apache.org/jira/browse/FLINK-33185 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Sergey Nuyanzin >Priority: Critical > Labels: test-stability > > This build > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53519=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8641 > fails as > {noformat} > Sep 29 05:13:54 Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkException: Pending slot request with > SlotRequestId{b6e57c09274f4edc50697300bc8859a8} has been released. > Sep 29 05:13:54 at > org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:226) > Sep 29 05:13:54 ... 36 more > Sep 29 05:13:54 Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkException: Pending slot request with > SlotRequestId{b6e57c09274f4edc50697300bc8859a8} has been released. > Sep 29 05:13:54 at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > Sep 29 05:13:54 at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > Sep 29 05:13:54 at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) > Sep 29 05:13:54 at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > Sep 29 05:13:54 ... 34 more > Sep 29 05:13:54 Caused by: org.apache.flink.util.FlinkException: > org.apache.flink.util.FlinkException: Pending slot request with > SlotRequestId{b6e57c09274f4edc50697300bc8859a8} has been released. > Sep 29 05:13:54 at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:373) > Sep 29 05:13:54 ... 30 more > Sep 29 05:13:54 Caused by: java.util.concurrent.TimeoutException: > java.util.concurrent.TimeoutException: Pending slot request timed out in slot > pool. > Sep 29 05:13:54 ... 30 more > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33203][sql-gateway] Introduce env.java.opts.sql-gateway to spe… [flink]
KarmaGYZ commented on PR #23526: URL: https://github.com/apache/flink/pull/23526#issuecomment-1763772532 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-31275) Flink supports reporting and storage of source/sink tables relationship
[ https://issues.apache.org/jira/browse/FLINK-31275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775527#comment-17775527 ] Zhenqiu Huang commented on FLINK-31275: --- [~zjureel] We have similar requirements. To accelerate the development, I can help on some Jira tickets. > Flink supports reporting and storage of source/sink tables relationship > --- > > Key: FLINK-31275 > URL: https://issues.apache.org/jira/browse/FLINK-31275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > > FLIP-314 has been accepted > https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33041][docs] Add an article to guide users to migrate their DataSet jobs to DataStream [flink]
WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1360095973 ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,758 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. + + + +DataSet +DataStream + + + +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile); + + + +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile); + + + + + + +As the source of DataSet is always bounded, the execution mode must be set to RuntimeMode.BATCH to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +Sources: The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the +`DataSource`. + +Sinks: The DataStream API uses the implementations of `SinkFunction` and `Sink` to write records to external system, while the +DataSet API uses the `FileOutputFormat`. + +If you are looking for pre-defined source and sink connectors of DataStream, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}) + + +## Implement the DataSet API by DataStream + + Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +### Map Review Comment: I followed your suggestion of table structure and currently all operation names are in the first column of table. ## docs/content/docs/dev/datastream/dataset_migration.md: ## @@ -0,0 +1,758 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360079090 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -441,15 +442,21 @@ protected Collection queryAggregatedMetricNames( protected abstract Map> queryAllAggregatedMetrics( -AbstractFlinkResource cr, -FlinkService flinkService, -Configuration conf, +Context ctx, Map> filteredVertexMetricNames); -public void cleanup(AbstractFlinkResource cr) { -var resourceId = ResourceID.fromResource(cr); -histories.remove(resourceId); -availableVertexMetricNames.remove(resourceId); +public JobDetailsInfo getJobDetailsInfo( +JobAutoScalerContext context, Duration clientTimeout) throws Exception { Review Comment: Thanks @mateczagany for this comment. IIUC, you mean `ScalingMetricCollector` is using the `RestClusterClient`, and `RestApiMetricsCollector` is totally based on `RestClusterClient`, so these 2 classes can be merged into one classes, right? If so, I try to explain the difference between : `RestApiMetricsCollector` and `ScalingMetricCollector`. - `RestApiMetricsCollector` calls `RestClusterClient`, and it's used to fetch specific metrics. - `ScalingMetricCollector` calls `RestClusterClient` and is not used to fetch specific metrics. - `RestClusterClient` is used in `ScalingMetricCollector` to get some job metadata, such as: `getJobDetailsInfo` to generate the `JobTopology`, `queryFilteredMetricNames`, `updateKafkaSourceMaxParallelisms`. - The JobTopology is the metadata of Job, and it cannot be fetched from metrics. That means the `RestClusterClient` is needed even if we query specific metrics from other system. Based on them, it may be better to keep `ScalingMetricCollector` as abstract class and not remove `RestApiMetricsCollector`. It's easy to fetch specific metrics from other system in the future. Also, we can see the `ScalingMetricCollector` also used the `RestClusterClient` on the current master branch. WDYT? And please correct me if my understanding is wrong, thanks~ [1] https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java#L368C45-L368C45 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31339][tests] Fix unstable tests of flink-end-to-end-tests-sql module [flink]
Jiabao-Sun commented on PR #23507: URL: https://github.com/apache/flink/pull/23507#issuecomment-1763677474 @luoyuxia, @lsyldliu, please take a look when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33116][tests] Fix CliClientTest.testCancelExecutionInteractiveMode fails with NPE [flink]
Jiabao-Sun commented on PR #23515: URL: https://github.com/apache/flink/pull/23515#issuecomment-1763676473 Hi @fsk119, could you help review this when you have time? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20281) Window aggregation supports changelog stream input
[ https://issues.apache.org/jira/browse/FLINK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-20281: Priority: Major (was: Not a Priority) > Window aggregation supports changelog stream input > -- > > Key: FLINK-20281 > URL: https://issues.apache.org/jira/browse/FLINK-20281 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner, Table SQL / Runtime >Reporter: Jark Wu >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor > Attachments: screenshot-1.png > > > Currently, window aggregation doesn't support to consume a changelog stream. > This makes it impossible to do a window aggregation on changelog sources > (e.g. Kafka with Debezium format, or upsert-kafka, or mysql-cdc). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33203][sql-gateway] Introduce env.java.opts.sql-gateway to spe… [flink]
flinkbot commented on PR #23526: URL: https://github.com/apache/flink/pull/23526#issuecomment-1763646179 ## CI report: * 4a70cc843fa8a12097a43567d23e4ce6191aeae5 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32650][protobuf]Added the ability to split flink-protobuf code… [flink]
ljw-hit commented on code in PR #23162: URL: https://github.com/apache/flink/pull/23162#discussion_r1360044268 ## flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigProtoBufCodeSpiltterTest.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema; +import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.junit.Test; + +/** + * Test class for below case + * + * + * syntax = "proto3"; + * package org.apache.flink.formats.protobuf.testproto; + * option java_package = "org.apache.flink.formats.protobuf.testproto"; + * option java_outer_classname = "BigPbClass"; + * import "google/protobuf/descriptor.proto"; + * message BigPbMessage { + * + * + * It is valid proto definition. + */ +public class BigProtoBufCodeSpiltterTest { Review Comment: @maosuhan Thank you very much for your review!@libenchao Can you pass this PR? ## flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigProtoBufCodeSpiltterTest.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema; +import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.junit.Test; + +/** + * Test class for below case + * + * + * syntax = "proto3"; + * package org.apache.flink.formats.protobuf.testproto; + * option java_package = "org.apache.flink.formats.protobuf.testproto"; + * option java_outer_classname = "BigPbClass"; + * import "google/protobuf/descriptor.proto"; + * message BigPbMessage { + * + * + * It is valid proto definition. + */ +public class BigProtoBufCodeSpiltterTest { Review Comment: @maosuhan Thank you very much for your review!@libenchao Can you pass this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33203) FLIP-374: Adding a separate configuration for specifying Java Options of the SQL Gateway
[ https://issues.apache.org/jira/browse/FLINK-33203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33203: --- Labels: pull-request-available (was: ) > FLIP-374: Adding a separate configuration for specifying Java Options of the > SQL Gateway > > > Key: FLINK-33203 > URL: https://issues.apache.org/jira/browse/FLINK-33203 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Gateway >Reporter: Yangze Guo >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > {color:#00}The SQL Gateway is an essential component of Flink in OLAP > scenarios, and its performance and stability determine the SLA of Flink as an > OLAP service. Just like other components in Flink, we propose adding a > separate configuration option to specify the Java options for the SQL > Gateway. This would allow users to fine-tune the memory settings, garbage > collection behavior, and other relevant Java parameters specific to the SQL > Gateway, ensuring optimal performance and stability in production > environments.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33203][sql-gateway] Introduce env.java.opts.sql-gateway to spe… [flink]
KarmaGYZ opened a new pull request, #23526: URL: https://github.com/apache/flink/pull/23526 …cify the jvm options of sql gateway ## What is the purpose of the change Introduce env.java.opts.sql-gateway to specify the jvm options of sql gateway. ## Verifying this change Manually launch a sql-gateway and check the jvm args. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
HuangZhenQiu commented on PR #23511: URL: https://github.com/apache/flink/pull/23511#issuecomment-1763633001 > Thank you @HuangZhenQiu a lot for contributing this. > > After reading the [Avro spec](https://avro.apache.org/docs/1.11.0/spec.html), I think we have wrongly mapped the Avro timestamp. > > Avro spec says: > > > Timestamp (millisecond precision) > > The timestamp-millis logical type represents an instant on the global timeline, independent of a particular time zone or calendar, with a precision of one millisecond. Please note that time zone information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, but not the original representation. In practice, such timestamps are typically displayed to users in their local time zones, therefore they may be displayed differently depending on the execution environment. > > A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC. > > [Consistent timestamp types in Hadoop SQL engines](https://docs.google.com/document/d/1gNRww9mZJcHvUDCXklzjFEQGpefsuR_akCDfWsdE35Q/edit) also pointed out: > > > Timestamps in Avro, Parquet and RCFiles with a binary SerDe have Instant semantics > > So Avro Timestamp is a Java Instant semantic that should map to Flink TIMESTAMP_LTZ, but currently, it maps to TIMESTAMP_NTZ. > > On the contrary, > > > Local timestamp (millisecond precision) > > The local-timestamp-millis logical type represents a timestamp in a local timezone, regardless of what specific time zone is considered local, with a precision of one millisecond. > > A local-timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds, from 1 January 1970 00:00:00.000. > > Avro LocalTimestamp is a Java LocalDateTime semantic that should map to Flink TIMESTAMP_NTZ. > > If we agree with this behavior, we may need to open a discussion in the dev ML about how to correct the behavior in a backward-compatible or incompatible way. @wuchong Thanks for the feedback according to the hadoop alignment doc. Beside this, I also feel unclear on how to converting timestamp data to TimestampData which is the RowData internal representation. A Flink user can define a dynamic table with Avro format on a timestamp field with a target timestamp with time zone, but we we can't convert the Avro long typed data to the target timestamp with time zone as the target Flink type is missing in Converters. I would like to open a discussion in dev ML after our offline sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]
1996fanrui commented on code in PR #677: URL: https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360032639 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.function.SupplierWithException; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import javax.annotation.Nullable; + +/** An implementation of JobAutoscalerContext for Kubernetes. */ +public class KubernetesJobAutoScalerContext extends JobAutoScalerContext { + +private final AbstractFlinkResource resource; + +private final KubernetesClient kubernetesClient; + +public KubernetesJobAutoScalerContext( +JobID jobID, Review Comment: Thanks for pointing it out, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs] Remove the OverrideDefault annotation for option slotmanager.number-of-slots.min [flink]
KarmaGYZ merged PR #23525: URL: https://github.com/apache/flink/pull/23525 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33277) Upgrading to actions/checkout@v4 requires GLIBC 2.25, 2.27, or 2.28 to be installed, apparently
Matthias Pohl created FLINK-33277: - Summary: Upgrading to actions/checkout@v4 requires GLIBC 2.25, 2.27, or 2.28 to be installed, apparently Key: FLINK-33277 URL: https://issues.apache.org/jira/browse/FLINK-33277 Project: Flink Issue Type: Sub-task Reporter: Matthias Pohl https://github.com/XComp/flink/actions/runs/6525835575/job/17718926296#step:5:64 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33254) Improve speed of compile step
[ https://issues.apache.org/jira/browse/FLINK-33254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-33254. --- Resolution: Fixed > Improve speed of compile step > - > > Key: FLINK-33254 > URL: https://issues.apache.org/jira/browse/FLINK-33254 > Project: Flink > Issue Type: Sub-task >Reporter: Matthias Pohl >Priority: Major > > There were issues with the compilation step where I initially thought that > it's due to the parallelization of the compilation (which is odd). This issue > is about investigating how to do the compilation and forwarding the artifacts > in the right way to the downstream jobs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33254) Improve speed of compile step
[ https://issues.apache.org/jira/browse/FLINK-33254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775373#comment-17775373 ] Matthias Pohl commented on FLINK-33254: --- The issues above are fixed. I tried to introduce caching of the local Maven repository as well to speed up the compilation. This was a bit tricky because the cache action didn't pick up the local folder. The reason for this is the working directory that's used in a Github Actions container setup: By default it will use \{{/__w}} as the working directory within the container which will be mapped to {{/home/runner/work}} in the runner system. We need to use {{/__w}} to refer to the Maven repository within the container to make the caching work (a bit more context can be found [here|https://josh-ops.com/posts/github-container-jobs/]). > Improve speed of compile step > - > > Key: FLINK-33254 > URL: https://issues.apache.org/jira/browse/FLINK-33254 > Project: Flink > Issue Type: Sub-task >Reporter: Matthias Pohl >Priority: Major > > There were issues with the compilation step where I initially thought that > it's due to the parallelization of the compilation (which is odd). This issue > is about investigating how to do the compilation and forwarding the artifacts > in the right way to the downstream jobs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33171][table planner] Table SQL support Not Equal for TimePoint type and TimeString [flink]
lincoln-lil commented on PR #23478: URL: https://github.com/apache/flink/pull/23478#issuecomment-1763405879 > @lincoln-lil Based on the approach you provided, I attempted to write some test cases. However, there are two types that are not covered: > > 1. non comparable types: I don't know how to construct case for this one. > 2. multiset types: The framework throws an error when using a multiset. >``` >testSqlApi("f26 = MULTISET['b', 'a', 'b']", "TRUE") >``` > > > > > > > > > > > >got exception: >``` >org.opentest4j.AssertionFailedError: Expression is converted into more than a Calc operation. Use a different test method. ==> expected: but was: > at org.apache.flink.table.planner.expressions.utils.ExpressionTestBase.addTestExpr(ExpressionTestBase.scala:324) >``` @fengjiajie for the non comparable types, I think we can ref to the `TypeCheckUtils#isComparable`: ```java public static boolean isComparable(LogicalType type) { return !isRaw(type) && !isMap(type) && !isMultiset(type) && !isRow(type) && !isArray(type) && !isStructuredType(type); } ``` for the multiset type, one viable way is using the `COLLECT`[1] aggregate function to construct it. 1. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#aggregate-functions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33171][table planner] Table SQL support Not Equal for TimePoint type and TimeString [flink]
lincoln-lil commented on code in PR #23478: URL: https://github.com/apache/flink/pull/23478#discussion_r1359886599 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala: ## @@ -488,6 +488,18 @@ object ScalarOperatorGens { else if (isNumeric(left.resultType) && isNumeric(right.resultType)) { generateComparison(ctx, "!=", left, right, resultType) } +// support date/time/timestamp not equalTo string. +else if ( + (isTimePoint(left.resultType) && isCharacterString(right.resultType)) || Review Comment: @fengjiajie just feel free to move forward, and let's fix this issue completely! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33198][Format/Avro] support local timezone timestamp logic type in AVRO [flink]
wuchong commented on code in PR #23511: URL: https://github.com/apache/flink/pull/23511#discussion_r1359881451 ## flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java: ## @@ -102,6 +104,13 @@ public final class AvroTestUtils { .setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z")) .setTypeTimestampMicros( Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)) +.setTypeLocalTimestampMillis( +LocalDateTime.ofInstant( Review Comment: nit: Please use `LocalDateTime.parse("2014-03-01T12:12:12.32")` to construct a `LocalDateTime`. ## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java: ## @@ -201,6 +204,9 @@ private static TimestampData convertToTimestamp(Object object) { millis = (Long) object; } else if (object instanceof Instant) { millis = ((Instant) object).toEpochMilli(); +} else if (object instanceof LocalDateTime) { +Instant instant = ((LocalDateTime) object).toInstant(ZoneOffset.UTC); +millis = instant.toEpochMilli(); Review Comment: `TimestampData.fromLocalDateTime(object)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33181) Table using `kinesis` connector can not be used for both read & write operations if it's defined with unsupported sink property
[ https://issues.apache.org/jira/browse/FLINK-33181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33181: --- Labels: pull-request-available (was: ) > Table using `kinesis` connector can not be used for both read & write > operations if it's defined with unsupported sink property > --- > > Key: FLINK-33181 > URL: https://issues.apache.org/jira/browse/FLINK-33181 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Table SQL / Runtime >Affects Versions: 1.15.4, aws-connector-4.1.0 >Reporter: Khanh Vu >Assignee: Khanh Vu >Priority: Major > Labels: pull-request-available > Fix For: aws-connector-4.2.0 > > > First, I define a table which uses `kinesis` connector with an unsupported > property for sink, e.g. `scan.stream.initpos`: > {code:sql} > %flink.ssql(type=update) > – Create input > DROP TABLE IF EXISTS `kds_input`; > CREATE TABLE `kds_input` ( > `some_string` STRING, > `some_int` BIGINT, > `time` AS PROCTIME() > ) WITH ( > 'connector' = 'kinesis', > 'stream' = 'ExampleInputStream', > 'aws.region' = 'us-east-1', > 'scan.stream.initpos' = 'LATEST', > 'format' = 'csv' > ); > {code} > I can read from my table (kds_input) without any issue, but it will throw > exception if I try to write to the table: > {code:sql} > %flink.ssql(type=update) > – Use to generate data in the input table > DROP TABLE IF EXISTS connector_cve_datagen; > CREATE TABLE connector_cve_datagen( > `some_string` STRING, > `some_int` BIGINT > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1', > 'fields.some_string.length' = '2'); > INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen > {code} > Exception observed: > {code:java} > Caused by: org.apache.flink.table.api.ValidationException: Unsupported > options found for 'kinesis'. > Unsupported options: > scan.stream.initpos > Supported options: > aws.region > connector > csv.allow-comments > csv.array-element-delimiter > csv.disable-quote-character > csv.escape-character > csv.field-delimiter > csv.ignore-parse-errors > csv.null-literal > csv.quote-character > format > property-version > sink.batch.max-size > sink.fail-on-error > sink.flush-buffer.size > sink.flush-buffer.timeout > sink.partitioner > sink.partitioner-field-delimiter > sink.producer.collection-max-count (deprecated) > sink.producer.collection-max-size (deprecated) > sink.producer.fail-on-error (deprecated) > sink.producer.record-max-buffered-time (deprecated) > sink.requests.max-buffered > sink.requests.max-inflight > stream > at > org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624) > at > org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914) > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978) > at > org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938) > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978) > at > org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65) > at > org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259) > ... 36 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33181] Allow a table definition can be used to read & write data to. [flink-connector-aws]
boring-cyborg[bot] commented on PR #105: URL: https://github.com/apache/flink-connector-aws/pull/105#issuecomment-1763380621 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33181] Allow a table definition can be used to read & write data to. [flink-connector-aws]
vtkhanh opened a new pull request, #105: URL: https://github.com/apache/flink-connector-aws/pull/105 ## Purpose of the change Allow a table can be used as a source and sink when it's defined with both consumer & producer options. The ability is achieved by not verifying an option if it's *indicated* as consumer (by `scan.` prefix) or producer (by `sink.` prefix) option when creating a source/sink based on the table definition. ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests* ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-33030) Add python 3.11 support
[ https://issues.apache.org/jira/browse/FLINK-33030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi closed FLINK-33030. -- Resolution: Implemented [{{2da9a96}}|https://github.com/apache/flink/commit/2da9a9639216b8c48850ee714065f090a80dcd65] in master > Add python 3.11 support > --- > > Key: FLINK-33030 > URL: https://issues.apache.org/jira/browse/FLINK-33030 > Project: Flink > Issue Type: New Feature > Components: API / Python >Affects Versions: 1.19.0 >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33030][python]Add python 3.11 support [flink]
mbalassi merged PR #23417: URL: https://github.com/apache/flink/pull/23417 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org