[jira] [Comment Edited] (FLINK-34108) Add URL_ENCODE and URL_DECODE function
[ https://issues.apache.org/jira/browse/FLINK-34108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845659#comment-17845659 ] chesterxu edited comment on FLINK-34108 at 5/12/24 2:35 AM: I would like to support it, please assign to me. was (Author: JIRAUSER302535): These two funtions are indeed useful in some flink cases. I can support it, please assign to me. > Add URL_ENCODE and URL_DECODE function > -- > > Key: FLINK-34108 > URL: https://issues.apache.org/jira/browse/FLINK-34108 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Martijn Visser >Priority: Major > > Add URL_ENCODE and URL_DECODE function > URL_ENCODE(str) - Translates a string into > 'application/x-www-form-urlencoded' format using a specific encoding scheme. > URL_DECODE(str) - Decodes a string in 'application/x-www-form-urlencoded' > format using a specific encoding scheme. > Related ticket from Calcite: CALCITE-5825 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-2106039721 I addressed all comments, except those I explained why I didn't. Also, now upgraded to Flink 1.18 and newer AsyncSinkBase API. I cannot split AMP signer and connector in separate PRs at this point, because of the interdependency with the sample application. Also, I strongly recommend to keep the sample application in the repository, because it serves both as documentation for the user and a way to test the connector (and optionally the signer) in an actual environment. The sample application is not supposed to be released as artifact, but just to stay in the repo as source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597510944 ## amp-request-signer/pom.xml: ## @@ -0,0 +1,58 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-connector-prometheus-parent +1.0.0-SNAPSHOT + + +Flink : Connectors : Prometheus : AMP request signer +org.apache.flink.connector.prometheus Review Comment: Fixed. Now `groupId` is `org.apache.flink` and `artifactId` is `flink-connector-prometheus-amp-request-signer` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector [flink-connector-aws]
vahmed-hamdy commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1597498488 ## flink-connector-aws/pom.xml: ## @@ -18,8 +18,8 @@ specific language governing permissions and limitations under the License. --> http://maven.apache.org/POM/4.0.0; -xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; Review Comment: change not needed ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkBuilder.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static software.amazon.awssdk.http.Protocol.HTTP1_1; + +/** + * Builder to construct {@link SqsSink}. + * + * The following example shows the minimum setup to create a {@link SqsSink} that + * writes String values to a SQS named sqsUrl. + * + * {@code + * Properties sinkProperties = new Properties(); + * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + * + * SqsSink sqsSink = + * SqsSink.builder() + * .setElementConverter(elementConverter) Review Comment: This setter in the java doc doesn't exist. ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit tests for {@link SqsExceptionClassifiers}. */ +public class SqsExceptionClassifiersTest { + +private final FatalExceptionClassifier classifier = +FatalExceptionClassifier.createChain( + SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), + SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier()); + +@Test +public void shouldClassifyNotAuthorizedAsFatal() { +AwsServiceException sqsException = +SqsException.builder() +.awsErrorDetails( + AwsErrorDetails.builder().errorCode("NotAuthorized").build()) +.build(); + +// isFatal returns `true` if an exception is non-fatal +assertFalse(classifier.isFatal(sqsException, ex -> {})); +} + +@Test +public void
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597510275 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java: ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.http; + +import org.apache.flink.connector.prometheus.sink.SinkMetrics; + +import org.apache.hc.client5.http.HttpRequestRetryStrategy; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.util.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.List; + +/** Retry strategy for the http client. */ +public class RemoteWriteRetryStrategy implements HttpRequestRetryStrategy { +private static final Logger LOG = LoggerFactory.getLogger(RemoteWriteRetryStrategy.class); + +private static final List> NON_RETRIABLE_EXCEPTIONS = +List.of( +InterruptedIOException.class, +UnknownHostException.class, +ConnectException.class, +NoRouteToHostException.class, +SSLException.class); + +private final long initialRetryDelayMs; +private final long maxRetryDelayMs; +private final int maxRetryCount; + +private final SinkMetrics counters; + +public RemoteWriteRetryStrategy(RetryConfiguration retryConfiguration, SinkMetrics counters) { +this.initialRetryDelayMs = retryConfiguration.getInitialRetryDelayMS(); +this.maxRetryDelayMs = retryConfiguration.getMaxRetryDelayMS(); +this.maxRetryCount = retryConfiguration.getMaxRetryCount(); +this.counters = counters; +} + +@Override +public boolean retryRequest( +HttpRequest httpRequest, IOException e, int execCount, HttpContext httpContext) { +// Retry on any IOException except those considered non-retriable +var retry = +(execCount <= maxRetryCount) && !(NON_RETRIABLE_EXCEPTIONS.contains(e.getClass())); +LOG.debug( +"{} retry on {}, at execution {}", +(retry) ? "DO" : "DO NOT", +e.getClass(), +execCount); +countRetry(retry); +return retry; +} + +@Override +public boolean retryRequest(HttpResponse httpResponse, int execCount, HttpContext httpContext) { +var retry = +(execCount <= maxRetryCount) +&& RemoteWriteResponseClassifier.isRetriableErrorResponse(httpResponse); +LOG.debug( +"{} retry on response {} {}, at execution {}", +(retry) ? "DO" : "DO NOT", +httpResponse.getCode(), +httpResponse.getReasonPhrase(), +execCount); +countRetry(retry); +return retry; +} + +@Override +public TimeValue getRetryInterval( +HttpResponse httpResponse, int execCount, HttpContext httpContext) { +long calculatedDelay = initialRetryDelayMs << (execCount - 1); Review Comment: I don't see much value in making the backoff strategy more sophisticated. Prometheus Remote-write specs just say it "MUST use a backoff algorithm to prevent overwhelming the server". Tweaking the `initialRetryIntervalMs` and `maxRetryDelayMs` will be enough to make the strategy more or less aggressive. Also, I don't see much value in adding a jitter. There is one writer thread per subtask, and and it's very unlikely they will be synchronized when writing to the endpoint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] [hotfix] Make weekly green again [flink-connector-opensearch]
snuyanzin merged PR #46: URL: https://github.com/apache/flink-connector-opensearch/pull/46 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Make weekly green again [flink-connector-opensearch]
snuyanzin commented on PR #46: URL: https://github.com/apache/flink-connector-opensearch/pull/46#issuecomment-2106034504 proof link to green nightly https://github.com/apache/flink-connector-opensearch/actions/runs/9046382776 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597508286 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java: ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Remote; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.io.CloseMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +/** Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. */ +public class PrometheusSinkWriter extends AsyncSinkWriter { + +/** + * * Batching of this sink is in terms of Samples, not bytes. The goal is adaptively increase + * the number of Samples in each batch, a WriteRequest sent to Prometheus, to a configurable + * number. This is the parameter maxBatchSizeInBytes. + * + * getSizeInBytes(requestEntry) returns the number of Samples (not bytes) and + * maxBatchSizeInBytes is actually in terms of Samples (not bytes). + * + * In AsyncSinkWriter, maxBatchSize is in terms of requestEntries (TimeSeries). But because + * each TimeSeries contains 1+ Samples, we set maxBatchSize = maxBatchSizeInBytes. + * + * maxRecordSizeInBytes is also calculated in the same unit assumed by getSizeInBytes(...). + * In our case is the max number of Samples in a single TimeSeries sent to the Sink. We are + * limiting the number of Samples in each TimeSeries to the max batch size, setting + * maxRecordSizeInBytes = maxBatchSizeInBytes. + */ +private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkWriter.class); + +private final SinkMetrics metrics; +private final CloseableHttpAsyncClient asyncHttpClient; +private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder; +private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + +public PrometheusSinkWriter( +ElementConverter elementConverter, +Sink.InitContext context, +int maxInFlightRequests, +int maxBufferedRequests, +int maxBatchSizeInSamples, +long maxTimeInBufferMS, +String prometheusRemoteWriteUrl, +CloseableHttpAsyncClient asyncHttpClient, +SinkMetrics metrics, +PrometheusRequestSigner requestSigner, +String httpUserAgent, +SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig) { +this( +elementConverter, +context, +maxInFlightRequests, +maxBufferedRequests, +maxBatchSizeInSamples, +maxTimeInBufferMS, +prometheusRemoteWriteUrl, +asyncHttpClient, +metrics, +requestSigner, +httpUserAgent, +errorHandlingBehaviorConfig, +Collections.emptyList()); +} + +public PrometheusSinkWriter( +ElementConverter elementConverter, +Sink.InitContext context, +int maxInFlightRequests, +int maxBufferedRequests, +int maxBatchSizeInSamples, +long maxTimeInBufferMS, +String
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597505926 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java: ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.prometheus.Remote; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.io.CloseMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +/** Writer, taking care of batching the {@link PrometheusTimeSeries} and handling retries. */ +public class PrometheusSinkWriter extends AsyncSinkWriter { Review Comment: Not much to test here, except the initialization that is coupled with the PrometheusSink initialization. I created a test similar to what is done in the Kinesis Sink. https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597488570 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializer.java: ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Serializes/deserializes the sink request-entry, the protobuf-generated {@link Types.TimeSeries}, + * using protobuf. + */ +public class PrometheusStateSerializer extends AsyncSinkWriterStateSerializer { +private static final int VERSION = 1; + +// Copied from AsyncSinkWriterStateSerializer.DATA_IDENTIFIER +private static final long DATA_IDENTIFIER = -1; + +@Override +protected void serializeRequestToStream(Types.TimeSeries request, DataOutputStream out) +throws IOException { +byte[] serializedRequest = request.toByteArray(); +out.write(serializedRequest); +} + +@Override +protected Types.TimeSeries deserializeRequestFromStream(long requestSize, DataInputStream in) +throws IOException { +// The size written into the serialized stat is the size of the protobuf-serialized +// time-series +byte[] requestData = new byte[(int) requestSize]; +in.read(requestData); +return Types.TimeSeries.parseFrom(requestData); +} + +@Override +public int getVersion() { +return VERSION; +} + +/** + * Overrides the original implementation that assumes the serialized size is the value returned + * by {@link PrometheusSinkWriter#getSizeInBytes(Types.TimeSeries)} + * + * Most of the code is copied from the original implementation of + * AsyncSinkWriterStateSerializer.serialize(). + * + * The state is serialized in form of + * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2], where REQUESTn is the + * Protobuf-serialized representation of a {@link Types.TimeSeries TimeSeries}. In this + * implementation SIZEn is the size of the Protobuf serialization, in bytes, that does not match + * the "size" of a {@link RequestEntryWrapper}. + * + * @param bufferedRequestState The buffered request state to be serialized + * @return serialized buffered request state + * @throws IOException + */ +@Override +public byte[] serialize(BufferedRequestState bufferedRequestState) +throws IOException { +Collection> bufferState = +bufferedRequestState.getBufferedRequestEntries(); + +try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); +final DataOutputStream out = new DataOutputStream(baos)) { + +out.writeLong(DATA_IDENTIFIER); // DATA_IDENTIFIER +out.writeInt(bufferState.size()); // NUM_OF_ELEMENTS + +for (RequestEntryWrapper wrapper : bufferState) { +// In the serialized state we write the size of the serialized representation, +// rather than the size +// held in RequestEntryWrapper, that is the output of +// AsyncSinkWriter.getSizeInBytes() +long requestEntrySize = + RequestEntrySizeUtils.requestSerializedSize(wrapper.getRequestEntry()); +out.writeLong(requestEntrySize); // SIZEn +serializeRequestToStream(wrapper.getRequestEntry(), out); // REQUESTn +} + +return baos.toByteArray(); +} +} + +/** + * Overrides the original
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597488524 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializer.java: ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Serializes/deserializes the sink request-entry, the protobuf-generated {@link Types.TimeSeries}, + * using protobuf. + */ +public class PrometheusStateSerializer extends AsyncSinkWriterStateSerializer { +private static final int VERSION = 1; + +// Copied from AsyncSinkWriterStateSerializer.DATA_IDENTIFIER +private static final long DATA_IDENTIFIER = -1; + +@Override +protected void serializeRequestToStream(Types.TimeSeries request, DataOutputStream out) +throws IOException { +byte[] serializedRequest = request.toByteArray(); +out.write(serializedRequest); +} + +@Override +protected Types.TimeSeries deserializeRequestFromStream(long requestSize, DataInputStream in) +throws IOException { +// The size written into the serialized stat is the size of the protobuf-serialized +// time-series +byte[] requestData = new byte[(int) requestSize]; +in.read(requestData); +return Types.TimeSeries.parseFrom(requestData); +} + +@Override +public int getVersion() { +return VERSION; +} + +/** + * Overrides the original implementation that assumes the serialized size is the value returned + * by {@link PrometheusSinkWriter#getSizeInBytes(Types.TimeSeries)} + * + * Most of the code is copied from the original implementation of + * AsyncSinkWriterStateSerializer.serialize(). + * + * The state is serialized in form of + * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2], where REQUESTn is the + * Protobuf-serialized representation of a {@link Types.TimeSeries TimeSeries}. In this + * implementation SIZEn is the size of the Protobuf serialization, in bytes, that does not match Review Comment: As above, it means "SIZE". Clarified the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597488453 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializer.java: ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Serializes/deserializes the sink request-entry, the protobuf-generated {@link Types.TimeSeries}, + * using protobuf. + */ +public class PrometheusStateSerializer extends AsyncSinkWriterStateSerializer { +private static final int VERSION = 1; + +// Copied from AsyncSinkWriterStateSerializer.DATA_IDENTIFIER +private static final long DATA_IDENTIFIER = -1; + +@Override +protected void serializeRequestToStream(Types.TimeSeries request, DataOutputStream out) +throws IOException { +byte[] serializedRequest = request.toByteArray(); +out.write(serializedRequest); +} + +@Override +protected Types.TimeSeries deserializeRequestFromStream(long requestSize, DataInputStream in) +throws IOException { +// The size written into the serialized stat is the size of the protobuf-serialized +// time-series +byte[] requestData = new byte[(int) requestSize]; +in.read(requestData); +return Types.TimeSeries.parseFrom(requestData); +} + +@Override +public int getVersion() { +return VERSION; +} + +/** + * Overrides the original implementation that assumes the serialized size is the value returned + * by {@link PrometheusSinkWriter#getSizeInBytes(Types.TimeSeries)} + * + * Most of the code is copied from the original implementation of + * AsyncSinkWriterStateSerializer.serialize(). + * + * The state is serialized in form of + * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2], where REQUESTn is the Review Comment: It actually means "REQUEST" Fixed the comment to make it clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597486440 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusRemoteWriteHttpRequestBuilder.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.util.Preconditions; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; + +import java.util.HashMap; +import java.util.Map; + +/** Builds the POST request to the Remote-Write endpoint for a given binary payload. */ +public class PrometheusRemoteWriteHttpRequestBuilder { + +private static final ContentType CONTENT_TYPE = ContentType.create("application/x-protobuf"); + +public static final String DEFAULT_USER_AGENT = "Flink-Prometheus"; +private final String prometheusRemoteWriteUrl; +private final PrometheusRequestSigner requestSigner; + +private final Map fixedHeaders; + +public PrometheusRemoteWriteHttpRequestBuilder( +String prometheusRemoteWriteUrl, +PrometheusRequestSigner requestSigner, +String httpUserAgent) { +Preconditions.checkNotNull(httpUserAgent, "User-Agent not specified"); + +this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl; +this.requestSigner = requestSigner; +this.fixedHeaders = +Map.of( +HttpHeaders.CONTENT_ENCODING, +"snappy", +"X-Prometheus-Remote-Write-Version", +"0.1.0", +HttpHeaders.USER_AGENT, +httpUserAgent); +} + +public SimpleHttpRequest buildHttpRequest(byte[] httpRequestBody) { +Map headers = new HashMap<>(fixedHeaders); +if (requestSigner != null) { +requestSigner.addSignatureHeaders(headers, httpRequestBody); +} + +var builder = Review Comment: Is it still the case for 1.18? Anyhow, I removed java 11 features -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597482298 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java: ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Pojo used as sink input, containing a single TimeSeries: a list of Labels and a list of Samples. + * + * metricName is mapped in Prometheus to the value of the mandatory label named '__name__' + * labels. The other labels, as key/value, are appended after the '__name__' label. + */ +@Public +public class PrometheusTimeSeries implements Serializable { +/** A single Label. */ +public static class Label implements Serializable { +private final String name; +private final String value; + +public Label(String name, String value) { Review Comment: I added further explanations in the README about the behavior and the responsibility of sending well-formed data. In any case, with the default error-handling behavior, if the application sends malformed data, the sink throws an exception when Prometheus rejects the write. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597479531 ## prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializerTest.java: ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class PrometheusStateSerializerTest { + +private static final ElementConverter +ELEMENT_CONVERTER = new PrometheusTimeSeriesConverter(); + +private static PrometheusTimeSeries getTestTimeSeries(int i) { +return PrometheusTimeSeries.builder() +.withMetricName("metric-name") +.addLabel("dimension-a", "value-" + i) +.addSample(i + 42.0, i + 1L) +.addSample(i + 3.14, i + 2L) +.build(); +} + +// This method uses the same implementation as PrometheusSinkWriter.getSizeInBytes() to extract +// the requestEntry "size" +// (i.e. the number of Samples). This is the "size" used in RequestEntryWrapper +// see +// https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java#L60 Review Comment: Reviewing this, I put back the link to the implementation in GitHub. What we want to show is the implementation, not the javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597479280 ## msf-amp-example/README.md: ## @@ -0,0 +1,68 @@ +## Sample application: Amazon Managed Service for Apache Flink and Amazon Managed Prometheus Review Comment: Updated to 1.18.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597478707 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/prometheus/Remote.java: ## @@ -0,0 +1,6661 @@ +/* Review Comment: I think we discussed this. Generating the Java classes from proto cannot be done with a pure Java solution included in the project. It requires ProtoBuf binaries. (e.g. see https://github.com/javiroman/jremotewrite/blob/e1b5d9865d42254b32f2da6ddfd6956569577601/pom.xml#L210) I am adding the *.proto file for reference and a section to the README to explain -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597475338 ## amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AWS4SignerForAuthorizationHeader.java: ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink.aws; + +import com.amazonaws.util.BinaryUtils; + +import java.net.URL; +import java.util.Date; +import java.util.Map; + +/** + * Sample AWS4 signer demonstrating how to sign requests to Amazon S3 using an 'Authorization' + * header. + */ +public class AWS4SignerForAuthorizationHeader extends AWS4SignerBase { Review Comment: It's silly, but this "just works". There is no specs . This is a class copied in a gazillion of OSS projects, including big ones. (try searching "Sample AWS4 signer demonstrating how to sign requests to Amazon S3" in GitHub) Any unit test would be written reverse-engineering the implementation, so useless. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597473538 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.annotation.Public; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import java.util.Collection; + +/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */ +@Public +public class PrometheusSink extends AsyncSinkBase { +private final String prometheusRemoteWriteUrl; +private final PrometheusAsyncHttpClientBuilder clientBuilder; +private final PrometheusRequestSigner requestSigner; +private final int maxBatchSizeInSamples; +private final String httpUserAgent; +private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + +protected PrometheusSink( Review Comment: Added a configurable metric group that gets appended after the `Sink__Writer` of AsyncSinkBase. I can't see how I can add a prefix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597458387 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java: ## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** + * Callback handling the outcome of the http async request. + * + * This class implements the error handling behaviour, based on the configuration in {@link + * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, the sink may throw an + * exception and cause the job to fail, or log the condition to WARN, increment the counters and + * continue with the next request. + * + * In any case, every write-request either entirely succeed or fail. Partial failures are not + * handled. + * + * In no condition a write-request is re-queued for the AsyncSink to reprocess: this would cause + * out of order writes that would be rejected by Prometheus. + * + * Note that the http async client retries, based on the configured retry policy. The callback is + * called with an outcome of *completed* either when the request has succeeded or the max retry + * limit has been exceeded. It is responsibility of the callback distinguishing between these + * conditions. + */ +class HttpResponseCallback implements FutureCallback { +private static final Logger LOG = LoggerFactory.getLogger(HttpResponseCallback.class); + +private final int timeSeriesCount; +private final long sampleCount; +private final Consumer> reQueuedResult; +private final SinkMetrics metrics; +private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + +public HttpResponseCallback( +int timeSeriesCount, +long sampleCount, +SinkMetrics metrics, +SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig, +Consumer> reQueuedResult) { +this.timeSeriesCount = timeSeriesCount; +this.sampleCount = sampleCount; +this.reQueuedResult = reQueuedResult; +this.metrics = metrics; +this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig; +} + +/** + * The completed outcome is invoked every time the http client successfully receives a valid + * http response, regardless of the status code. + * + * This method classifies the responses and implements the behaviour expected by the + * Remote-Write specifications. In case of error, the behaviour is determined by the error + * handling configuration. + */ +@Override +public void completed(SimpleHttpResponse response) { Review Comment: Done. One nested if (no
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597456357 ## prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java: ## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.prometheus.sink; + +import org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior; +import org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException; +import org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration; +import org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier; +import org.apache.flink.connector.prometheus.sink.prometheus.Types; + +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT; +import static org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED; + +/** + * Callback handling the outcome of the http async request. + * + * This class implements the error handling behaviour, based on the configuration in {@link + * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, the sink may throw an + * exception and cause the job to fail, or log the condition to WARN, increment the counters and + * continue with the next request. + * + * In any case, every write-request either entirely succeed or fail. Partial failures are not + * handled. + * + * In no condition a write-request is re-queued for the AsyncSink to reprocess: this would cause + * out of order writes that would be rejected by Prometheus. + * + * Note that the http async client retries, based on the configured retry policy. The callback is + * called with an outcome of *completed* either when the request has succeeded or the max retry + * limit has been exceeded. It is responsibility of the callback distinguishing between these + * conditions. + */ +class HttpResponseCallback implements FutureCallback { +private static final Logger LOG = LoggerFactory.getLogger(HttpResponseCallback.class); + +private final int timeSeriesCount; +private final long sampleCount; +private final Consumer> reQueuedResult; +private final SinkMetrics metrics; +private final SinkWriterErrorHandlingBehaviorConfiguration errorHandlingBehaviorConfig; + +public HttpResponseCallback( +int timeSeriesCount, +long sampleCount, +SinkMetrics metrics, Review Comment: Not exactly a minor change, because it has impact in many places. But definitely a good idea. Implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35198) Support the execution of refresh materialized table
[ https://issues.apache.org/jira/browse/FLINK-35198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu resolved FLINK-35198. --- Resolution: Fixed > Support the execution of refresh materialized table > --- > > Key: FLINK-35198 > URL: https://issues.apache.org/jira/browse/FLINK-35198 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > {code:SQL} > ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH > [PARTITION (key1=val1, key2=val2, ...)] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35198) Support the execution of refresh materialized table
[ https://issues.apache.org/jira/browse/FLINK-35198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845610#comment-17845610 ] dalongliu commented on FLINK-35198: --- Merged in master: 9fe8d7bf870987bf43bad63078e2590a38e4faf6 > Support the execution of refresh materialized table > --- > > Key: FLINK-35198 > URL: https://issues.apache.org/jira/browse/FLINK-35198 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: dalongliu >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > {code:SQL} > ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH > [PARTITION (key1=val1, key2=val2, ...)] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
lsyldliu merged PR #24760: URL: https://github.com/apache/flink/pull/24760 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]
1996fanrui commented on PR #24652: URL: https://github.com/apache/flink/pull/24652#issuecomment-2105732689 It seems reverting it doesn't make sense, I close this PR first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]
1996fanrui closed pull request #24652: [FLINK-35040] Revert `commons-io` to 2.11.0 URL: https://github.com/apache/flink/pull/24652 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-35041. - Fix Version/s: 1.20.0 Resolution: Fixed IIUC, this issue has been fixed in master branch, so I will close it first. If it still happens, feel free to re-open it. > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Fix For: 1.20.0 > > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845606#comment-17845606 ] Rui Fan commented on FLINK-35041: - Merged to master (1.20.0) via: 86c8304d735581518ce666be4896b7d5e48a1e42 > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]
1996fanrui merged PR #24770: URL: https://github.com/apache/flink/pull/24770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]
1996fanrui commented on PR #24770: URL: https://github.com/apache/flink/pull/24770#issuecomment-2105730845 Thanks for the review! CI is green, merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][test] Fix InstantiationUtilTest cannot find assertFalse [flink]
1996fanrui merged PR #24772: URL: https://github.com/apache/flink/pull/24772 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][test] Fix InstantiationUtilTest cannot find assertFalse [flink]
1996fanrui commented on PR #24772: URL: https://github.com/apache/flink/pull/24772#issuecomment-2105730686 Thanks for the quick review! CI is green, mergeing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]
leonardBang merged PR #3314: URL: https://github.com/apache/flink-cdc/pull/3314 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]
masteryhx commented on code in PR #24770: URL: https://github.com/apache/flink/pull/24770#discussion_r1597415073 ## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java: ## @@ -343,7 +343,11 @@ private static StreamStateHandle createDummySegmentFileStateHandle(Random rnd) { private static StreamStateHandle createDummySegmentFileStateHandle( Random rnd, boolean isEmpty) { return isEmpty -? TestingSegmentFileStateHandle.EMPTY_INSTANCE +? new TestingSegmentFileStateHandle( +new Path(UUID.randomUUID().toString()), Review Comment: Thanks for correcting this. There is no strong reason for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597414330 ## prometheus-connector/README.md: ## @@ -0,0 +1,189 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in order and well-formed. + +For efficiency, the connector does not do any validation. +If input is out of order or malformed, the write request is rejected by Prometheus and data is discarded by the sink. +The connector will log a warning and count rejected data in custom metrics, but the data is discarded. + +The sink receives as input time-series, each containing one or more samples. +To optimise the write throughput, input time-series are batched, in the order they are received, and written with a single write-request. + +If a write-request contains any out-of-order or malformed data, **the entire request is rejected** and all time series are discarded. +The reason is Remote-Write specifications [explicitly forbids retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff) of rejected write requests (4xx responses). +and the Prometheus response does not contain enough information to efficiently partially retry the write, discarding the offending data. + +### Responsibilities of the application + +It is responsibility of the application sending the data to the sink in the correct order and format. + +1. Input time-series must be well-formed, e.g. only valid and non-duplicated labels, +samples in timestamp order (see [Labels and Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels) in Prometheus Remote-Write specs). +2. Input time-series with identical labels are sent to the sink in timestamp order. +3. If sink parallelism > 1 is used, the input stream must be partitioned so that all time-series with identical labels go to the same sink subtask. A `KeySelector` is provided to partition input correctly (see [Partitioning](#partitioning), below). + + + Sink input objects + +To help sending well-formed data to the sink, the connector expect [`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java) POJOs as input. + +Each `PrometheusTimeSeries` instance maps 1-to-1 to a [remote-write `TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). Each object contains: +* exactly one `metericName`, mapped to the special `__name__` label +* optionally, any number of additional labels { k: String, v:String } +* one or more `Samples` { value: double, timestamp: long } - must be in timestamp order + +`PrometheusTimeSeries` provides a builder interface. + +```java + +// List> samples = ... + +PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder() +.withMetricName("CPU") // mapped to `__name__` label +.addLabel("InstanceID", instanceId) +.addLabel("AcccountID", accountId); + +for(Tuple2 sample : samples) { +tsBuilder.addSample(sample.f0, sample.f1); +} + +PrometheusTimeSeries ts = tsBuilder.build(); +``` + + +**Important**: for efficiency, the builder does reorder the samples. It is responsibility of the application to **add samples in timestamp order**. + +### Batching, blocking writes and retry + +The sink batches multiple time-series into a single write-request, retaining the order.. + +Batching is based on the number of samples. Each write-request contains up to 500 samples, with a max buffering time of 5 seconds +(both configurable). The number of time-series doesn't matter. + +As by [Prometheus Remote-Write specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff), +the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample ordering, and uses and exponential backoff. + +The exponential backoff starts with an initial delay (default 30 ms) and increases it exponentially up to a max retry +delay (default 5 sec). It continues retrying until the max number of retries is reached (default reties forever). + +On non-retriable error response (4xx, except 429, non retryable exceptions), or on reaching the retry limit, +**the entire write-request**, containing the batch of time-series, **is dropped**. + +Every dropped request is logged at WARN level, including the reason provided by the remote-write endpoint. Review Comment: Added explanation -- This is an
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597412742 ## prometheus-connector/README.md: ## @@ -0,0 +1,189 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in order and well-formed. + +For efficiency, the connector does not do any validation. +If input is out of order or malformed, the write request is rejected by Prometheus and data is discarded by the sink. Review Comment: Sorry, understood the point. Adding an explanation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597412465 ## prometheus-connector/README.md: ## @@ -0,0 +1,189 @@ +## Flink Prometheus connector (sink) + +Implementation of the Prometheus sink connector for DataStream API. + +The sink writes to Prometheus using the Remote-Write interface, based on [Remote-Write specifications version 1.0](https://prometheus.io/docs/concepts/remote_write_spec/) + +### Guarantees and input restrictions + +Due to the strict [ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and [format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) requirements +of Prometheus Remote-Write, the sink guarantees that input data are written to Prometheus only if input data are in order and well-formed. + +For efficiency, the connector does not do any validation. +If input is out of order or malformed, the write request is rejected by Prometheus and data is discarded by the sink. Review Comment: Configuration is extensively documented below, in the README -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35197][table] Support the execution of supsend materialized table in continuous refresh mode [flink]
lsyldliu commented on code in PR #24765: URL: https://github.com/apache/flink/pull/24765#discussion_r1597412228 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -197,6 +220,29 @@ void testCreateMaterializedTableInContinuousMode() throws Exception { .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty(); assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty(); + +ContinuousRefreshHandler continuousRefreshHandler = +ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( +actualMaterializedTable.getSerializedRefreshHandler(), +getClass().getClassLoader()); +// check the background job is running +String describeJobDDL = +String.format("DESCRIBE JOB '%s'", continuousRefreshHandler.getJobId()); +OperationHandle describeJobHandle = +service.executeStatement(sessionHandle, describeJobDDL, -1, new Configuration()); +awaitOperationTermination(service, sessionHandle, describeJobHandle); +List jobResults = fetchAllResults(sessionHandle, describeJobHandle); + assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING"); + +// get checkpoint config from the materialized table +CheckpointConfigInfo checkpointConfigInfo = +getCheckpointConfigInfo(clusterClient, continuousRefreshHandler.getJobId()); Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597412205 ## pom.xml: ## @@ -0,0 +1,120 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.flink +flink-connector-parent +1.0.0 + + +org.apache.flink +flink-connector-prometheus-parent +1.0.0-SNAPSHOT +pom + +Flink Prometheus + +prometheus-connector +amp-request-signer +example-datastream-job + + + + +The Apache Software License, Version 2.0 +https://www.apache.org/licenses/LICENSE-2.0.txt +repo + + + + +UTF-8 +11 +${target.java.version} +${target.java.version} +1.17.0 +3.22.2 +5.2.1 +1.12.570 +2.17.1 + + + + + +com.amazonaws +aws-java-sdk-bom +${aws.sdkv1.version} Review Comment: Correction: it is still required. But I am moving any reference to AWS SDK dependency to the AMP Signer module. This is used to sign the AMP request only. There is no working example using SDK2 in Java. We managed to make it work with SDKv1. I'd suggest to keep it as-is for the moment and consider upgrading the signer in a future version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597410882 ## pom.xml: ## @@ -0,0 +1,120 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.flink +flink-connector-parent +1.0.0 + + +org.apache.flink +flink-connector-prometheus-parent +1.0.0-SNAPSHOT +pom + +Flink Prometheus + +prometheus-connector +amp-request-signer +example-datastream-job + + + + +The Apache Software License, Version 2.0 +https://www.apache.org/licenses/LICENSE-2.0.txt +repo + + + + +UTF-8 +11 +${target.java.version} +${target.java.version} +1.17.0 +3.22.2 +5.2.1 +1.12.570 +2.17.1 + + + + + +com.amazonaws +aws-java-sdk-bom +${aws.sdkv1.version} Review Comment: This is actually no longer needed. The AMP request signer no longer uses any SDK I am removing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]
1996fanrui commented on PR #24685: URL: https://github.com/apache/flink/pull/24685#issuecomment-2105650954 > @1996fanrui PTAL Thanks @GOODBOY008 for the ping, this PR is huge, I try to review it next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]
1996fanrui commented on PR #24670: URL: https://github.com/apache/flink/pull/24670#issuecomment-2105650755 > @Jiabao-Sun @1996fanrui I will open hotfix to mater. Sorry, I didn't notice it. I start fix it after I received the CI alert. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]
GOODBOY008 commented on PR #24685: URL: https://github.com/apache/flink/pull/24685#issuecomment-2105650442 @1996fanrui PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][test] Fix InstantiationUtilTest cannot find assertFalse [flink]
flinkbot commented on PR #24772: URL: https://github.com/apache/flink/pull/24772#issuecomment-2105650048 ## CI report: * 5a8d66f2dfcff0d756fa9d77285f135dadc00ab2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][test] Fix InstantiationUtilTest cannot find assertFalse [flink]
1996fanrui opened a new pull request, #24772: URL: https://github.com/apache/flink/pull/24772 ## What is the purpose of the change The master branch cannot build success. https://github.com/apache/flink/pull/24670 doesn't rebase the master branch before merging, and other PR added `InstantiationUtilTest#testHasNullaryConstructorFalse` to use the assertFalse. But https://github.com/apache/flink/pull/24670 removed the import assertFalse, so master branch cannot compile. ## Brief change log Migrate the assertFalse to assertThat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35197][table] Support the execution of supsend materialized table in continuous refresh mode [flink]
hackergin commented on code in PR #24765: URL: https://github.com/apache/flink/pull/24765#discussion_r1597406989 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -197,6 +220,29 @@ void testCreateMaterializedTableInContinuousMode() throws Exception { .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty(); assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty(); + +ContinuousRefreshHandler continuousRefreshHandler = +ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( +actualMaterializedTable.getSerializedRefreshHandler(), +getClass().getClassLoader()); +// check the background job is running +String describeJobDDL = +String.format("DESCRIBE JOB '%s'", continuousRefreshHandler.getJobId()); +OperationHandle describeJobHandle = +service.executeStatement(sessionHandle, describeJobDDL, -1, new Configuration()); +awaitOperationTermination(service, sessionHandle, describeJobHandle); +List jobResults = fetchAllResults(sessionHandle, describeJobHandle); + assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING"); + +// get checkpoint config from the materialized table +CheckpointConfigInfo checkpointConfigInfo = +getCheckpointConfigInfo(clusterClient, continuousRefreshHandler.getJobId()); Review Comment: Currently, I have not found a good way to directly obtain the response of type String. A better way might be to add a get method to CheckpointConfigInfo class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]
GOODBOY008 commented on PR #24670: URL: https://github.com/apache/flink/pull/24670#issuecomment-2105648555 [workflow_dispatch](https://github.com/apache/flink/actions/runs/9042536161/job/24849180888#step:6:903) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]
GOODBOY008 commented on PR #24670: URL: https://github.com/apache/flink/pull/24670#issuecomment-2105648441 @Jiabao-Sun @1996fanrui I will open hotfix to mater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35197][table] Support the execution of supsend materialized table in continuous refresh mode [flink]
hackergin commented on code in PR #24765: URL: https://github.com/apache/flink/pull/24765#discussion_r1597406641 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -236,6 +282,150 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } +@Test +void testAlterMaterializedTableSuspendAndResume( +@TempDir Path temporaryPath, +@InjectClusterClient RestClusterClient restClusterClient) +throws Exception { +// initialize session handle, create test-filesystem catalog and register it to catalog +// store +SessionHandle sessionHandle = initializeSession(); + +String materializedTableDDL = +"CREATE MATERIALIZED TABLE users_shops" ++ " PARTITIONED BY (ds)\n" ++ " WITH(\n" ++ " 'format' = 'debezium-json'\n" ++ " )\n" ++ " FRESHNESS = INTERVAL '30' SECOND\n" ++ " AS SELECT \n" ++ " user_id,\n" ++ " shop_id,\n" ++ " ds,\n" ++ " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" ++ " SUM (1) AS pv\n" ++ " FROM (\n" ++ "SELECT user_id, shop_id, DATE_FORMAT(order_created_at, '-MM-dd') AS ds, payment_amount_cents FROM datagenSource" ++ " ) AS tmp\n" ++ " GROUP BY (user_id, shop_id, ds)"; + +OperationHandle materializedTableHandle = +service.executeStatement( +sessionHandle, materializedTableDDL, -1, new Configuration()); +awaitOperationTermination(service, sessionHandle, materializedTableHandle); + +// set up savepoint dir +String savepointDir = temporaryPath.toString(); +String alterMaterializedTableSavepointDDL = +String.format("SET 'state.savepoints.dir' = 'file://%s'", savepointDir); Review Comment: This must be set, otherwise the following error will be reported. ``` Caused by: java.lang.IllegalArgumentException: The scheme (hdfs://, file://, etc) is null. Please specify the file system scheme explicitly in the URI. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25537) [JUnit5 Migration] Module: flink-core
[ https://issues.apache.org/jira/browse/FLINK-25537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845561#comment-17845561 ] Rui Fan commented on FLINK-25537: - Merged to master(1.20.0) via: 5af9acba5b9fbcdf9aadf62310cd337d508158c3 > [JUnit5 Migration] Module: flink-core > - > > Key: FLINK-25537 > URL: https://issues.apache.org/jira/browse/FLINK-25537 > Project: Flink > Issue Type: Sub-task > Components: API / Core >Reporter: Qingsheng Ren >Assignee: Aiden Gong >Priority: Minor > Labels: pull-request-available, stale-assigned > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]
1996fanrui merged PR #24670: URL: https://github.com/apache/flink/pull/24670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]
1996fanrui commented on PR #24770: URL: https://github.com/apache/flink/pull/24770#issuecomment-2105643430 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34615]Split `ExternalizedCheckpointCleanup` out of `Checkpoint… [flink]
spoon-lz commented on PR #24461: URL: https://github.com/apache/flink/pull/24461#issuecomment-2105643136 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
ysmintor commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105638144 Yes, I agree this refactoring. And is there any plan to add support Oracle, PostgreSQL, OceanBase, etc into pipeline connectors ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33892][runtime] Support Job Recovery from JobMaster Failures for Batch Jobs. [flink]
flinkbot commented on PR #24771: URL: https://github.com/apache/flink/pull/24771#issuecomment-2105619645 ## CI report: * 77f15330f3056aaa256565f7fc800787598a6c6a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33892) FLIP-383: Support Job Recovery from JobMaster Failures for Batch Jobs
[ https://issues.apache.org/jira/browse/FLINK-33892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33892: --- Labels: pull-request-available (was: ) > FLIP-383: Support Job Recovery from JobMaster Failures for Batch Jobs > - > > Key: FLINK-33892 > URL: https://issues.apache.org/jira/browse/FLINK-33892 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Lijie Wang >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > > This is the umbrella ticket for > [FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33892][runtime] Support Job Recovery from JobMaster Failures for Batch Jobs. [flink]
JunRuiLee opened a new pull request, #24771: URL: https://github.com/apache/flink/pull/24771 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log Support Job Recovery from JobMaster Failures for Batch Jobs. ## Verifying this change This change added tests and can be verified by BatchJobRecoveryTest and JMFailoverITCase. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597382234 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } +@Test +void testAlterMaterializedTableRefresh() throws Exception { +long timeout = Duration.ofSeconds(20).toMillis(); +long pause = Duration.ofSeconds(2).toMillis(); +// initialize session handle, create test-filesystem catalog and register it to catalog +// store +SessionHandle sessionHandle = initializeSession(); + +List data = new ArrayList<>(); +data.add(Row.of(1L, 1L, 1L, "2024-01-01")); +data.add(Row.of(2L, 2L, 2L, "2024-01-02")); +data.add(Row.of(3L, 3L, 3L, "2024-01-02")); +String dataId = TestValuesTableFactory.registerData(data); + +String sourceDdl = +String.format( +"CREATE TABLE my_source (\n" ++ " order_id BIGINT,\n" ++ " user_id BIGINT,\n" ++ " shop_id BIGINT,\n" ++ " order_created_at STRING\n" ++ ")\n" ++ "WITH (\n" ++ " 'connector' = 'values',\n" ++ " 'bounded' = 'true',\n" ++ " 'data-id' = '%s'\n" ++ ")", +dataId); +service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + +String materializedTableDDL = +"CREATE MATERIALIZED TABLE my_materialized_table" ++ " PARTITIONED BY (ds)\n" ++ " WITH(\n" ++ " 'format' = 'debezium-json'\n" ++ " )\n" ++ " FRESHNESS = INTERVAL '2' SECOND\n" ++ " AS SELECT \n" ++ " user_id,\n" ++ " shop_id,\n" ++ " ds,\n" ++ " COUNT(order_id) AS order_cnt\n" ++ " FROM (\n" ++ "SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" ++ " ) AS tmp\n" ++ " GROUP BY (user_id, shop_id, ds)"; + +OperationHandle materializedTableHandle = +service.executeStatement( +sessionHandle, materializedTableDDL, -1, new Configuration()); +awaitOperationTermination(service, sessionHandle, materializedTableHandle); + +// verify data exists in materialized table +CommonTestUtils.waitUtil( +() -> +fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() +== data.size(), +Duration.ofMillis(timeout), +Duration.ofMillis(pause), +"Failed to verify the data in materialized table."); +assertThat( +fetchTableData( +sessionHandle, +"SELECT * FROM my_materialized_table where ds = '2024-01-02'") +.size()) +.isEqualTo(2); + +// remove the last element +data.remove(2); + +long currentTime = System.currentTimeMillis(); +String alterStatement = +"ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')"; +OperationHandle alterHandle = +service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); +awaitOperationTermination(service, sessionHandle, alterHandle); +List result = fetchAllResults(service, sessionHandle, alterHandle); +assertThat(result.size()).isEqualTo(1); +String jobId = result.get(0).getString(0).toString(); + +MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster(); Review Comment: Got it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597376170 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2105596048 Thanks @aiwenmo for reviewing, addressed review comments above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]
yuxiqian commented on PR #3314: URL: https://github.com/apache/flink-cdc/pull/3314#issuecomment-2105593186 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]
yuxiqian opened a new pull request, #3314: URL: https://github.com/apache/flink-cdc/pull/3314 This PR fixes dead links brought by #3310. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35313][mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]
yeezychao commented on PR #1907: URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522 > > @yuxiqian Turning on upsert mode still fails to filter -u data. I am very confused as to why the same PR application failed to test in cdc 3.2 (flink 1.18) version, but it still works in version 2.2 (flink 1.15). Unfortunately, I have not found the reason yet. ![image](https://private-user-images.githubusercontent.com/18387619/329751040-32609ad6-9730-4bf5-83c6-91f1343dc617.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTU0MDA1MjAsIm5iZiI6MTcxNTQwMDIyMCwicGF0aCI6Ii8xODM4NzYxOS8zMjk3NTEwNDAtMzI2MDlhZDYtOTczMC00YmY1LTgzYzYtOTFmMTM0M2RjNjE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA1MTElMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNTExVDA0MDM0MFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTI1NjQzNjYzN2I2YTU3NzAwYjA0ZDkzODIxMTU0ZTc2ZjQ1YmFiYmZmOTViNzk1MDNmNzBmNDkzYjIwMmIwNGUmWC1BbXotU2lnbmVkSGVhZG Vycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.kfPitV5gbKlL5u6DiGPoxjmPCZUsBpg62S5dTlOigrs) > > @yeezychao Maybe check the output log and confirm if MySQL source actually sends any `-U` events to downstream? IIRC Flink will automatically append a `ChangelogNormalize` node to backfill missing update before events if source doesn't provide it. You are right!The `ChangelogNormalize` node is indeed added under the Flink 1.18 version,but the Flink 1.15 version haven't. ![image](https://github.com/apache/flink-cdc/assets/18387619/f25a44fe-df3a-43d6-b48e-98fc1be487d8) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28867) Parquet reader support nested type in array/map type
[ https://issues.apache.org/jira/browse/FLINK-28867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845539#comment-17845539 ] Xingcan Cui commented on FLINK-28867: - Hey [~jark], any plan to improve this in the near future? I feel that this is a blocker for Flink OLAP despite the data lake projects having their data readers/writers. Sometimes users would like to use Flink to process some raw parquet files. > Parquet reader support nested type in array/map type > > > Key: FLINK-28867 > URL: https://issues.apache.org/jira/browse/FLINK-28867 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: dalongliu >Priority: Major > Attachments: ReadParquetArray1.java, part-00121.parquet > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
yuxiqian commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105586766 @leonardBang Seems I missed some `{{ref}}` hyperlinks. Will fix it first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
leonardBang commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105584829 @yuxiqian Will appreciate if you can also open PR for release-3.0 and release-3.1 branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
leonardBang merged PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org