[jira] [Comment Edited] (FLINK-34108) Add URL_ENCODE and URL_DECODE function

2024-05-11 Thread chesterxu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845659#comment-17845659
 ] 

chesterxu edited comment on FLINK-34108 at 5/12/24 2:35 AM:


I would like to support it, please assign to me. 


was (Author: JIRAUSER302535):
These two funtions are indeed useful in some flink cases. I can support it, 
please assign to me. 

> Add URL_ENCODE and URL_DECODE function
> --
>
> Key: FLINK-34108
> URL: https://issues.apache.org/jira/browse/FLINK-34108
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Martijn Visser
>Priority: Major
>
> Add URL_ENCODE and URL_DECODE function
> URL_ENCODE(str) - Translates a string into 
> 'application/x-www-form-urlencoded' format using a specific encoding scheme. 
> URL_DECODE(str) - Decodes a string in 'application/x-www-form-urlencoded' 
> format using a specific encoding scheme. 
> Related ticket from Calcite: CALCITE-5825



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-2106039721

   I addressed all comments, except those I explained why I didn't.
   Also, now upgraded to Flink 1.18 and newer AsyncSinkBase API.
   
   I cannot split AMP signer and connector in separate PRs at this point, 
because of the interdependency with the sample application. 
   Also, I strongly recommend to keep the sample application in the repository, 
because it serves both as documentation for the user and a way to test the 
connector (and optionally the signer) in an actual environment. The sample 
application is not supposed to be released as artifact, but just to stay in the 
repo as source.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597510944


##
amp-request-signer/pom.xml:
##
@@ -0,0 +1,58 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-connector-prometheus-parent
+1.0.0-SNAPSHOT
+
+
+Flink : Connectors : Prometheus : AMP request signer
+org.apache.flink.connector.prometheus

Review Comment:
   Fixed.
   Now `groupId` is `org.apache.flink` and `artifactId` is 
`flink-connector-prometheus-amp-request-signer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector [flink-connector-aws]

2024-05-11 Thread via GitHub


vahmed-hamdy commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1597498488


##
flink-connector-aws/pom.xml:
##
@@ -18,8 +18,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 http://maven.apache.org/POM/4.0.0;
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;

Review Comment:
   change not needed



##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkBuilder.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
SqsSink} that
+ * writes String values to a SQS named sqsUrl.
+ *
+ * {@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink sqsSink =
+ * SqsSink.builder()
+ * .setElementConverter(elementConverter)

Review Comment:
   This setter in the java doc doesn't exist.



##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+private final FatalExceptionClassifier classifier =
+FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+@Test
+public void shouldClassifyNotAuthorizedAsFatal() {
+AwsServiceException sqsException =
+SqsException.builder()
+.awsErrorDetails(
+
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+.build();
+
+// isFatal returns `true` if an exception is non-fatal
+assertFalse(classifier.isFatal(sqsException, ex -> {}));
+}
+
+@Test
+public void 

Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597510275


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/http/RemoteWriteRetryStrategy.java:
##
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.http;
+
+import org.apache.flink.connector.prometheus.sink.SinkMetrics;
+
+import org.apache.hc.client5.http.HttpRequestRetryStrategy;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLException;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.List;
+
+/** Retry strategy for the http client. */
+public class RemoteWriteRetryStrategy implements HttpRequestRetryStrategy {
+private static final Logger LOG = 
LoggerFactory.getLogger(RemoteWriteRetryStrategy.class);
+
+private static final List> 
NON_RETRIABLE_EXCEPTIONS =
+List.of(
+InterruptedIOException.class,
+UnknownHostException.class,
+ConnectException.class,
+NoRouteToHostException.class,
+SSLException.class);
+
+private final long initialRetryDelayMs;
+private final long maxRetryDelayMs;
+private final int maxRetryCount;
+
+private final SinkMetrics counters;
+
+public RemoteWriteRetryStrategy(RetryConfiguration retryConfiguration, 
SinkMetrics counters) {
+this.initialRetryDelayMs = retryConfiguration.getInitialRetryDelayMS();
+this.maxRetryDelayMs = retryConfiguration.getMaxRetryDelayMS();
+this.maxRetryCount = retryConfiguration.getMaxRetryCount();
+this.counters = counters;
+}
+
+@Override
+public boolean retryRequest(
+HttpRequest httpRequest, IOException e, int execCount, HttpContext 
httpContext) {
+// Retry on any IOException except those considered non-retriable
+var retry =
+(execCount <= maxRetryCount) && 
!(NON_RETRIABLE_EXCEPTIONS.contains(e.getClass()));
+LOG.debug(
+"{} retry on {}, at execution {}",
+(retry) ? "DO" : "DO NOT",
+e.getClass(),
+execCount);
+countRetry(retry);
+return retry;
+}
+
+@Override
+public boolean retryRequest(HttpResponse httpResponse, int execCount, 
HttpContext httpContext) {
+var retry =
+(execCount <= maxRetryCount)
+&& 
RemoteWriteResponseClassifier.isRetriableErrorResponse(httpResponse);
+LOG.debug(
+"{} retry on response {} {}, at execution {}",
+(retry) ? "DO" : "DO NOT",
+httpResponse.getCode(),
+httpResponse.getReasonPhrase(),
+execCount);
+countRetry(retry);
+return retry;
+}
+
+@Override
+public TimeValue getRetryInterval(
+HttpResponse httpResponse, int execCount, HttpContext httpContext) 
{
+long calculatedDelay = initialRetryDelayMs << (execCount - 1);

Review Comment:
   I don't see much value in making the backoff strategy more sophisticated. 
   Prometheus Remote-write specs just say it "MUST use a backoff algorithm to 
prevent overwhelming the server".
   Tweaking the `initialRetryIntervalMs` and `maxRetryDelayMs` will be enough 
to make the strategy more or less aggressive.
   Also, I don't see much value in adding a jitter. There is one writer thread 
per subtask, and and it's very unlikely they will be synchronized when writing 
to the endpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] [hotfix] Make weekly green again [flink-connector-opensearch]

2024-05-11 Thread via GitHub


snuyanzin merged PR #46:
URL: https://github.com/apache/flink-connector-opensearch/pull/46


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Make weekly green again [flink-connector-opensearch]

2024-05-11 Thread via GitHub


snuyanzin commented on PR #46:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/46#issuecomment-2106034504

   proof link to green nightly 
https://github.com/apache/flink-connector-opensearch/actions/runs/9046382776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597508286


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java:
##
@@ -0,0 +1,189 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Remote;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.core5.io.CloseMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+/** Writer, taking care of batching the {@link PrometheusTimeSeries} and 
handling retries. */
+public class PrometheusSinkWriter extends 
AsyncSinkWriter {
+
+/**
+ * * Batching of this sink is in terms of Samples, not bytes. The goal is 
adaptively increase
+ * the number of Samples in each batch, a WriteRequest sent to Prometheus, 
to a configurable
+ * number. This is the parameter maxBatchSizeInBytes.
+ *
+ * getSizeInBytes(requestEntry) returns the number of Samples (not 
bytes) and
+ * maxBatchSizeInBytes is actually in terms of Samples (not bytes).
+ *
+ * In AsyncSinkWriter, maxBatchSize is in terms of requestEntries 
(TimeSeries). But because
+ * each TimeSeries contains 1+ Samples, we set maxBatchSize = 
maxBatchSizeInBytes.
+ *
+ * maxRecordSizeInBytes is also calculated in the same unit assumed by 
getSizeInBytes(...).
+ * In our case is the max number of Samples in a single TimeSeries sent to 
the Sink. We are
+ * limiting the number of Samples in each TimeSeries to the max batch 
size, setting
+ * maxRecordSizeInBytes = maxBatchSizeInBytes.
+ */
+private static final Logger LOG = 
LoggerFactory.getLogger(PrometheusSinkWriter.class);
+
+private final SinkMetrics metrics;
+private final CloseableHttpAsyncClient asyncHttpClient;
+private final PrometheusRemoteWriteHttpRequestBuilder requestBuilder;
+private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+
+public PrometheusSinkWriter(
+ElementConverter 
elementConverter,
+Sink.InitContext context,
+int maxInFlightRequests,
+int maxBufferedRequests,
+int maxBatchSizeInSamples,
+long maxTimeInBufferMS,
+String prometheusRemoteWriteUrl,
+CloseableHttpAsyncClient asyncHttpClient,
+SinkMetrics metrics,
+PrometheusRequestSigner requestSigner,
+String httpUserAgent,
+SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig) {
+this(
+elementConverter,
+context,
+maxInFlightRequests,
+maxBufferedRequests,
+maxBatchSizeInSamples,
+maxTimeInBufferMS,
+prometheusRemoteWriteUrl,
+asyncHttpClient,
+metrics,
+requestSigner,
+httpUserAgent,
+errorHandlingBehaviorConfig,
+Collections.emptyList());
+}
+
+public PrometheusSinkWriter(
+ElementConverter 
elementConverter,
+Sink.InitContext context,
+int maxInFlightRequests,
+int maxBufferedRequests,
+int maxBatchSizeInSamples,
+long maxTimeInBufferMS,
+String 

Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597505926


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkWriter.java:
##
@@ -0,0 +1,189 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import org.apache.flink.connector.prometheus.sink.prometheus.Remote;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.core5.io.CloseMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+/** Writer, taking care of batching the {@link PrometheusTimeSeries} and 
handling retries. */
+public class PrometheusSinkWriter extends 
AsyncSinkWriter {

Review Comment:
   Not much to test here, except the initialization that is coupled with the 
PrometheusSink initialization.
   I created a test similar to what is done in the Kinesis Sink.
   
   
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597488570


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializer.java:
##
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Serializes/deserializes the sink request-entry, the protobuf-generated 
{@link Types.TimeSeries},
+ * using protobuf.
+ */
+public class PrometheusStateSerializer extends 
AsyncSinkWriterStateSerializer {
+private static final int VERSION = 1;
+
+// Copied from AsyncSinkWriterStateSerializer.DATA_IDENTIFIER
+private static final long DATA_IDENTIFIER = -1;
+
+@Override
+protected void serializeRequestToStream(Types.TimeSeries request, 
DataOutputStream out)
+throws IOException {
+byte[] serializedRequest = request.toByteArray();
+out.write(serializedRequest);
+}
+
+@Override
+protected Types.TimeSeries deserializeRequestFromStream(long requestSize, 
DataInputStream in)
+throws IOException {
+// The size written into the serialized stat is the size of the 
protobuf-serialized
+// time-series
+byte[] requestData = new byte[(int) requestSize];
+in.read(requestData);
+return Types.TimeSeries.parseFrom(requestData);
+}
+
+@Override
+public int getVersion() {
+return VERSION;
+}
+
+/**
+ * Overrides the original implementation that assumes the serialized size 
is the value returned
+ * by {@link PrometheusSinkWriter#getSizeInBytes(Types.TimeSeries)}
+ *
+ * Most of the code is copied from the original implementation of
+ * AsyncSinkWriterStateSerializer.serialize().
+ *
+ * The state is serialized in form of
+ * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2], 
where REQUESTn is the
+ * Protobuf-serialized representation of a {@link Types.TimeSeries 
TimeSeries}. In this
+ * implementation SIZEn is the size of the Protobuf serialization, in 
bytes, that does not match
+ * the "size" of a {@link RequestEntryWrapper}.
+ *
+ * @param bufferedRequestState The buffered request state to be serialized
+ * @return serialized buffered request state
+ * @throws IOException
+ */
+@Override
+public byte[] serialize(BufferedRequestState 
bufferedRequestState)
+throws IOException {
+Collection> bufferState =
+bufferedRequestState.getBufferedRequestEntries();
+
+try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+final DataOutputStream out = new DataOutputStream(baos)) {
+
+out.writeLong(DATA_IDENTIFIER); // DATA_IDENTIFIER
+out.writeInt(bufferState.size()); // NUM_OF_ELEMENTS
+
+for (RequestEntryWrapper wrapper : bufferState) {
+// In the serialized state we write the size of the serialized 
representation,
+// rather than the size
+// held in RequestEntryWrapper, that is the output of
+// AsyncSinkWriter.getSizeInBytes()
+long requestEntrySize =
+
RequestEntrySizeUtils.requestSerializedSize(wrapper.getRequestEntry());
+out.writeLong(requestEntrySize); // SIZEn
+serializeRequestToStream(wrapper.getRequestEntry(), out); // 
REQUESTn
+}
+
+return baos.toByteArray();
+}
+}
+
+/**
+ * Overrides the original 

Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597488524


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializer.java:
##
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Serializes/deserializes the sink request-entry, the protobuf-generated 
{@link Types.TimeSeries},
+ * using protobuf.
+ */
+public class PrometheusStateSerializer extends 
AsyncSinkWriterStateSerializer {
+private static final int VERSION = 1;
+
+// Copied from AsyncSinkWriterStateSerializer.DATA_IDENTIFIER
+private static final long DATA_IDENTIFIER = -1;
+
+@Override
+protected void serializeRequestToStream(Types.TimeSeries request, 
DataOutputStream out)
+throws IOException {
+byte[] serializedRequest = request.toByteArray();
+out.write(serializedRequest);
+}
+
+@Override
+protected Types.TimeSeries deserializeRequestFromStream(long requestSize, 
DataInputStream in)
+throws IOException {
+// The size written into the serialized stat is the size of the 
protobuf-serialized
+// time-series
+byte[] requestData = new byte[(int) requestSize];
+in.read(requestData);
+return Types.TimeSeries.parseFrom(requestData);
+}
+
+@Override
+public int getVersion() {
+return VERSION;
+}
+
+/**
+ * Overrides the original implementation that assumes the serialized size 
is the value returned
+ * by {@link PrometheusSinkWriter#getSizeInBytes(Types.TimeSeries)}
+ *
+ * Most of the code is copied from the original implementation of
+ * AsyncSinkWriterStateSerializer.serialize().
+ *
+ * The state is serialized in form of
+ * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2], 
where REQUESTn is the
+ * Protobuf-serialized representation of a {@link Types.TimeSeries 
TimeSeries}. In this
+ * implementation SIZEn is the size of the Protobuf serialization, in 
bytes, that does not match

Review Comment:
   As above, it means "SIZE".
   Clarified the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597488453


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializer.java:
##
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Serializes/deserializes the sink request-entry, the protobuf-generated 
{@link Types.TimeSeries},
+ * using protobuf.
+ */
+public class PrometheusStateSerializer extends 
AsyncSinkWriterStateSerializer {
+private static final int VERSION = 1;
+
+// Copied from AsyncSinkWriterStateSerializer.DATA_IDENTIFIER
+private static final long DATA_IDENTIFIER = -1;
+
+@Override
+protected void serializeRequestToStream(Types.TimeSeries request, 
DataOutputStream out)
+throws IOException {
+byte[] serializedRequest = request.toByteArray();
+out.write(serializedRequest);
+}
+
+@Override
+protected Types.TimeSeries deserializeRequestFromStream(long requestSize, 
DataInputStream in)
+throws IOException {
+// The size written into the serialized stat is the size of the 
protobuf-serialized
+// time-series
+byte[] requestData = new byte[(int) requestSize];
+in.read(requestData);
+return Types.TimeSeries.parseFrom(requestData);
+}
+
+@Override
+public int getVersion() {
+return VERSION;
+}
+
+/**
+ * Overrides the original implementation that assumes the serialized size 
is the value returned
+ * by {@link PrometheusSinkWriter#getSizeInBytes(Types.TimeSeries)}
+ *
+ * Most of the code is copied from the original implementation of
+ * AsyncSinkWriterStateSerializer.serialize().
+ *
+ * The state is serialized in form of
+ * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2], 
where REQUESTn is the

Review Comment:
   It actually means "REQUEST"
   Fixed the comment to make it clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597486440


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusRemoteWriteHttpRequestBuilder.java:
##
@@ -0,0 +1,76 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpHeaders;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Builds the POST request to the Remote-Write endpoint for a given binary 
payload. */
+public class PrometheusRemoteWriteHttpRequestBuilder {
+
+private static final ContentType CONTENT_TYPE = 
ContentType.create("application/x-protobuf");
+
+public static final String DEFAULT_USER_AGENT = "Flink-Prometheus";
+private final String prometheusRemoteWriteUrl;
+private final PrometheusRequestSigner requestSigner;
+
+private final Map fixedHeaders;
+
+public PrometheusRemoteWriteHttpRequestBuilder(
+String prometheusRemoteWriteUrl,
+PrometheusRequestSigner requestSigner,
+String httpUserAgent) {
+Preconditions.checkNotNull(httpUserAgent, "User-Agent not specified");
+
+this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
+this.requestSigner = requestSigner;
+this.fixedHeaders =
+Map.of(
+HttpHeaders.CONTENT_ENCODING,
+"snappy",
+"X-Prometheus-Remote-Write-Version",
+"0.1.0",
+HttpHeaders.USER_AGENT,
+httpUserAgent);
+}
+
+public SimpleHttpRequest buildHttpRequest(byte[] httpRequestBody) {
+Map headers = new HashMap<>(fixedHeaders);
+if (requestSigner != null) {
+requestSigner.addSignatureHeaders(headers, httpRequestBody);
+}
+
+var builder =

Review Comment:
   Is it still the case for 1.18?
   Anyhow, I removed java 11 features



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597482298


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java:
##
@@ -0,0 +1,186 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.Public;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Pojo used as sink input, containing a single TimeSeries: a list of Labels 
and a list of Samples.
+ *
+ * metricName is mapped in Prometheus to the value of the mandatory label 
named '__name__'
+ * labels. The other labels, as key/value, are appended after the '__name__' 
label.
+ */
+@Public
+public class PrometheusTimeSeries implements Serializable {
+/** A single Label. */
+public static class Label implements Serializable {
+private final String name;
+private final String value;
+
+public Label(String name, String value) {

Review Comment:
   I added further explanations in the README about the behavior and the 
responsibility of sending well-formed data.
   In any case, with the default error-handling behavior, if the application 
sends malformed data, the sink throws an exception when Prometheus rejects the 
write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597479531


##
prometheus-connector/src/test/java/org/apache/flink/connector/prometheus/sink/PrometheusStateSerializerTest.java:
##
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class PrometheusStateSerializerTest {
+
+private static final ElementConverter
+ELEMENT_CONVERTER = new PrometheusTimeSeriesConverter();
+
+private static PrometheusTimeSeries getTestTimeSeries(int i) {
+return PrometheusTimeSeries.builder()
+.withMetricName("metric-name")
+.addLabel("dimension-a", "value-" + i)
+.addSample(i + 42.0, i + 1L)
+.addSample(i + 3.14, i + 2L)
+.build();
+}
+
+// This method uses the same implementation as 
PrometheusSinkWriter.getSizeInBytes() to extract
+// the requestEntry "size"
+// (i.e. the number of Samples). This is the "size" used in 
RequestEntryWrapper
+// see
+// 
https://github.com/apache/flink/blob/69e812688b43be9a0c4f79e6af81bc2d1d8a873e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterStateSerializer.java#L60

Review Comment:
   Reviewing this, I put back the link to the implementation in GitHub.
   What we want to show is the implementation, not the javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597479280


##
msf-amp-example/README.md:
##
@@ -0,0 +1,68 @@
+## Sample application: Amazon Managed Service for Apache Flink and Amazon 
Managed Prometheus

Review Comment:
   Updated to 1.18.1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597478707


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/prometheus/Remote.java:
##
@@ -0,0 +1,6661 @@
+/*

Review Comment:
   I think we discussed this.
   Generating the Java classes from proto cannot be done with a pure Java 
solution included in the project. 
   It requires ProtoBuf binaries. (e.g. see 
https://github.com/javiroman/jremotewrite/blob/e1b5d9865d42254b32f2da6ddfd6956569577601/pom.xml#L210)
   
I am adding the *.proto file for reference and a section to the README to 
explain



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597475338


##
amp-request-signer/src/main/java/org/apache/flink/connector/prometheus/sink/aws/AWS4SignerForAuthorizationHeader.java:
##
@@ -0,0 +1,124 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink.aws;
+
+import com.amazonaws.util.BinaryUtils;
+
+import java.net.URL;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Sample AWS4 signer demonstrating how to sign requests to Amazon S3 using an 
'Authorization'
+ * header.
+ */
+public class AWS4SignerForAuthorizationHeader extends AWS4SignerBase {

Review Comment:
   It's silly, but this "just works". There is no specs . 
   This is a class copied in a gazillion of OSS projects, including big 
ones.
   (try searching "Sample AWS4 signer demonstrating how to sign requests to 
Amazon S3" in GitHub)
   Any unit test would be written reverse-engineering the implementation, so 
useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597473538


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##
@@ -0,0 +1,129 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.util.Collection;
+
+/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
+@Public
+public class PrometheusSink extends AsyncSinkBase {
+private final String prometheusRemoteWriteUrl;
+private final PrometheusAsyncHttpClientBuilder clientBuilder;
+private final PrometheusRequestSigner requestSigner;
+private final int maxBatchSizeInSamples;
+private final String httpUserAgent;
+private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+
+protected PrometheusSink(

Review Comment:
   Added a configurable metric group that gets appended after the 
`Sink__Writer` of AsyncSinkBase. I can't see how I can add a prefix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597458387


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java:
##
@@ -0,0 +1,192 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED;
+
+/**
+ * Callback handling the outcome of the http async request.
+ *
+ * This class implements the error handling behaviour, based on the 
configuration in {@link
+ * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, 
the sink may throw an
+ * exception and cause the job to fail, or log the condition to WARN, 
increment the counters and
+ * continue with the next request.
+ *
+ * In any case, every write-request either entirely succeed or fail. 
Partial failures are not
+ * handled.
+ *
+ * In no condition a write-request is re-queued for the AsyncSink to 
reprocess: this would cause
+ * out of order writes that would be rejected by Prometheus.
+ *
+ * Note that the http async client retries, based on the configured retry 
policy. The callback is
+ * called with an outcome of *completed* either when the request has succeeded 
or the max retry
+ * limit has been exceeded. It is responsibility of the callback 
distinguishing between these
+ * conditions.
+ */
+class HttpResponseCallback implements FutureCallback {
+private static final Logger LOG = 
LoggerFactory.getLogger(HttpResponseCallback.class);
+
+private final int timeSeriesCount;
+private final long sampleCount;
+private final Consumer> reQueuedResult;
+private final SinkMetrics metrics;
+private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+
+public HttpResponseCallback(
+int timeSeriesCount,
+long sampleCount,
+SinkMetrics metrics,
+SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig,
+Consumer> reQueuedResult) {
+this.timeSeriesCount = timeSeriesCount;
+this.sampleCount = sampleCount;
+this.reQueuedResult = reQueuedResult;
+this.metrics = metrics;
+this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
+}
+
+/**
+ * The completed outcome is invoked every time the http client 
successfully receives a valid
+ * http response, regardless of the status code.
+ *
+ * This method classifies the responses and implements the behaviour 
expected by the
+ * Remote-Write specifications. In case of error, the behaviour is 
determined by the error
+ * handling configuration.
+ */
+@Override
+public void completed(SimpleHttpResponse response) {

Review Comment:
   Done. 
   One nested if (no 

Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597456357


##
prometheus-connector/src/main/java/org/apache/flink/connector/prometheus/sink/HttpResponseCallback.java:
##
@@ -0,0 +1,192 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.OnErrorBehavior;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.PrometheusSinkWriteException;
+import 
org.apache.flink.connector.prometheus.sink.errorhandling.SinkWriterErrorHandlingBehaviorConfiguration;
+import 
org.apache.flink.connector.prometheus.sink.http.RemoteWriteResponseClassifier;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_NON_RETRIABLE_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_SAMPLES_RETRY_LIMIT_DROPPED;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_OUT;
+import static 
org.apache.flink.connector.prometheus.sink.SinkMetrics.SinkCounter.NUM_WRITE_REQUESTS_PERMANENTLY_FAILED;
+
+/**
+ * Callback handling the outcome of the http async request.
+ *
+ * This class implements the error handling behaviour, based on the 
configuration in {@link
+ * SinkWriterErrorHandlingBehaviorConfiguration}. Depending on the condition, 
the sink may throw an
+ * exception and cause the job to fail, or log the condition to WARN, 
increment the counters and
+ * continue with the next request.
+ *
+ * In any case, every write-request either entirely succeed or fail. 
Partial failures are not
+ * handled.
+ *
+ * In no condition a write-request is re-queued for the AsyncSink to 
reprocess: this would cause
+ * out of order writes that would be rejected by Prometheus.
+ *
+ * Note that the http async client retries, based on the configured retry 
policy. The callback is
+ * called with an outcome of *completed* either when the request has succeeded 
or the max retry
+ * limit has been exceeded. It is responsibility of the callback 
distinguishing between these
+ * conditions.
+ */
+class HttpResponseCallback implements FutureCallback {
+private static final Logger LOG = 
LoggerFactory.getLogger(HttpResponseCallback.class);
+
+private final int timeSeriesCount;
+private final long sampleCount;
+private final Consumer> reQueuedResult;
+private final SinkMetrics metrics;
+private final SinkWriterErrorHandlingBehaviorConfiguration 
errorHandlingBehaviorConfig;
+
+public HttpResponseCallback(
+int timeSeriesCount,
+long sampleCount,
+SinkMetrics metrics,

Review Comment:
   Not exactly a minor change, because it has impact in many places. But 
definitely a good idea.
   Implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35198) Support the execution of refresh materialized table

2024-05-11 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu resolved FLINK-35198.
---
Resolution: Fixed

> Support the execution of refresh materialized table
> ---
>
> Key: FLINK-35198
> URL: https://issues.apache.org/jira/browse/FLINK-35198
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {code:SQL}
> ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH
>  [PARTITION (key1=val1, key2=val2, ...)]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35198) Support the execution of refresh materialized table

2024-05-11 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845610#comment-17845610
 ] 

dalongliu commented on FLINK-35198:
---

Merged in master: 9fe8d7bf870987bf43bad63078e2590a38e4faf6

> Support the execution of refresh materialized table
> ---
>
> Key: FLINK-35198
> URL: https://issues.apache.org/jira/browse/FLINK-35198
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {code:SQL}
> ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH
>  [PARTITION (key1=val1, key2=val2, ...)]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


lsyldliu merged PR #24760:
URL: https://github.com/apache/flink/pull/24760


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]

2024-05-11 Thread via GitHub


1996fanrui commented on PR #24652:
URL: https://github.com/apache/flink/pull/24652#issuecomment-2105732689

   It seems reverting it doesn't make sense, I close this  PR  first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35040] Revert `commons-io` to 2.11.0 [flink]

2024-05-11 Thread via GitHub


1996fanrui closed pull request #24652: [FLINK-35040] Revert `commons-io` to 
2.11.0
URL: https://github.com/apache/flink/pull/24652


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-11 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-35041.
-
Fix Version/s: 1.20.0
   Resolution: Fixed

IIUC, this issue has been fixed in master  branch, so I will close it first.

If it still happens, feel free to re-open  it.

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-11 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845606#comment-17845606
 ] 

Rui Fan commented on FLINK-35041:
-

Merged to master (1.20.0) via:  86c8304d735581518ce666be4896b7d5e48a1e42

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]

2024-05-11 Thread via GitHub


1996fanrui merged PR #24770:
URL: https://github.com/apache/flink/pull/24770


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]

2024-05-11 Thread via GitHub


1996fanrui commented on PR #24770:
URL: https://github.com/apache/flink/pull/24770#issuecomment-2105730845

   Thanks for the review!
   
   CI is green,  merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][test] Fix InstantiationUtilTest cannot find assertFalse [flink]

2024-05-11 Thread via GitHub


1996fanrui merged PR #24772:
URL: https://github.com/apache/flink/pull/24772


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][test] Fix InstantiationUtilTest cannot find assertFalse [flink]

2024-05-11 Thread via GitHub


1996fanrui commented on PR #24772:
URL: https://github.com/apache/flink/pull/24772#issuecomment-2105730686

   Thanks for the quick review!
   
   CI is green, mergeing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]

2024-05-11 Thread via GitHub


leonardBang merged PR #3314:
URL: https://github.com/apache/flink-cdc/pull/3314


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]

2024-05-11 Thread via GitHub


masteryhx commented on code in PR #24770:
URL: https://github.com/apache/flink/pull/24770#discussion_r1597415073


##
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java:
##
@@ -343,7 +343,11 @@ private static StreamStateHandle 
createDummySegmentFileStateHandle(Random rnd) {
 private static StreamStateHandle createDummySegmentFileStateHandle(
 Random rnd, boolean isEmpty) {
 return isEmpty
-? TestingSegmentFileStateHandle.EMPTY_INSTANCE
+? new TestingSegmentFileStateHandle(
+new Path(UUID.randomUUID().toString()),

Review Comment:
   Thanks for correcting this.
   There is no strong reason for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597414330


##
prometheus-connector/README.md:
##
@@ -0,0 +1,189 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based on 
[Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and 
[format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in order and well-formed.
+
+For efficiency, the connector does not do any validation.
+If input is out of order or malformed, the write request is rejected by 
Prometheus and data is discarded by the sink.
+The connector will log a warning and count rejected data in custom metrics, 
but the data is discarded.
+
+The sink receives as input time-series, each containing one or more samples. 
+To optimise the write throughput, input time-series are batched, in the order 
they are received, and written with a single write-request.
+
+If a write-request contains any out-of-order or malformed data, **the entire 
request is rejected** and all time series are discarded.
+The reason is Remote-Write specifications [explicitly forbids 
retrying](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff)
 of rejected write requests (4xx responses).
+and the Prometheus response does not contain enough information to efficiently 
partially retry the write, discarding the offending data.
+
+### Responsibilities of the application
+
+It is responsibility of the application sending the data to the sink in the 
correct order and format.
+
+1. Input time-series must be well-formed, e.g. only valid and non-duplicated 
labels, 
+samples in timestamp order (see [Labels and 
Ordering](https://prometheus.io/docs/concepts/remote_write_spec/#labels) in 
Prometheus Remote-Write specs).
+2. Input time-series with identical labels are sent to the sink in timestamp 
order.
+3. If sink parallelism > 1 is used, the input stream must be partitioned so 
that all time-series with identical labels go to the same sink subtask. A 
`KeySelector` is provided to partition input correctly (see 
[Partitioning](#partitioning), below). 
+
+
+ Sink input objects
+
+To help sending well-formed data to the sink, the connector expect 
[`PrometheusTimeSeries`](./src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusTimeSeries.java)
 POJOs as input.
+
+Each `PrometheusTimeSeries` instance maps 1-to-1 to a [remote-write 
`TimeSeries`](https://prometheus.io/docs/concepts/remote_write_spec/#protocol). 
Each object contains:
+* exactly one `metericName`, mapped to the special  `__name__` label
+* optionally, any number of additional labels { k: String, v:String }
+* one or more `Samples` { value: double, timestamp: long } - must be in 
timestamp order
+
+`PrometheusTimeSeries` provides a builder interface.
+
+```java
+
+// List> samples = ...
+
+PrometheusTimeSeries.Builder tsBuilder = PrometheusTimeSeries.builder()
+.withMetricName("CPU") // mapped to  `__name__` label
+.addLabel("InstanceID", instanceId)
+.addLabel("AcccountID", accountId);
+
+for(Tuple2 sample : samples) {
+tsBuilder.addSample(sample.f0, sample.f1);
+}
+
+PrometheusTimeSeries ts = tsBuilder.build();
+```
+
+
+**Important**: for efficiency, the builder does reorder the samples. It is 
responsibility of the application to **add samples in timestamp order**.
+
+### Batching, blocking writes and retry
+
+The sink batches multiple time-series into a single write-request, retaining 
the order..
+
+Batching is based on the number of samples. Each write-request contains up to 
500 samples, with a max buffering time of 5 seconds 
+(both configurable). The number of time-series doesn't matter.
+
+As by [Prometheus Remote-Write 
specifications](https://prometheus.io/docs/concepts/remote_write_spec/#retries-backoff),
 
+the sink retries 5xx and 429 responses. Retrying is blocking, to retain sample 
ordering, and uses and exponential backoff.
+
+The exponential backoff starts with an initial delay (default 30 ms) and 
increases it exponentially up to a max retry 
+delay (default 5 sec). It continues retrying until the max number of retries 
is reached (default reties forever).
+
+On non-retriable error response (4xx, except 429, non retryable exceptions), 
or on reaching the retry limit, 
+**the entire write-request**, containing the batch of time-series, **is 
dropped**.
+
+Every dropped request is logged at WARN level, including the reason provided 
by the remote-write endpoint.

Review Comment:
   Added explanation



-- 
This is an 

Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597412742


##
prometheus-connector/README.md:
##
@@ -0,0 +1,189 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based on 
[Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and 
[format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in order and well-formed.
+
+For efficiency, the connector does not do any validation.
+If input is out of order or malformed, the write request is rejected by 
Prometheus and data is discarded by the sink.

Review Comment:
   Sorry, understood the point. Adding an explanation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597412465


##
prometheus-connector/README.md:
##
@@ -0,0 +1,189 @@
+## Flink Prometheus connector (sink)
+
+Implementation of the Prometheus sink connector for DataStream API.
+
+The sink writes to Prometheus using the Remote-Write interface, based on 
[Remote-Write specifications version 
1.0](https://prometheus.io/docs/concepts/remote_write_spec/)
+
+### Guarantees and input restrictions
+
+Due to the strict 
[ordering](https://prometheus.io/docs/concepts/remote_write_spec/#ordering) and 
[format](https://prometheus.io/docs/concepts/remote_write_spec/#labels) 
requirements
+of Prometheus Remote-Write, the sink guarantees that input data are written to 
Prometheus only if input data are in order and well-formed.
+
+For efficiency, the connector does not do any validation.
+If input is out of order or malformed, the write request is rejected by 
Prometheus and data is discarded by the sink.

Review Comment:
   Configuration is extensively documented below, in the README



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35197][table] Support the execution of supsend materialized table in continuous refresh mode [flink]

2024-05-11 Thread via GitHub


lsyldliu commented on code in PR #24765:
URL: https://github.com/apache/flink/pull/24765#discussion_r1597412228


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -197,6 +220,29 @@ void testCreateMaterializedTableInContinuousMode() throws 
Exception {
 .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
 
assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
 
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+
+ContinuousRefreshHandler continuousRefreshHandler =
+ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+actualMaterializedTable.getSerializedRefreshHandler(),
+getClass().getClassLoader());
+// check the background job is running
+String describeJobDDL =
+String.format("DESCRIBE JOB '%s'", 
continuousRefreshHandler.getJobId());
+OperationHandle describeJobHandle =
+service.executeStatement(sessionHandle, describeJobDDL, -1, 
new Configuration());
+awaitOperationTermination(service, sessionHandle, describeJobHandle);
+List jobResults = fetchAllResults(sessionHandle, 
describeJobHandle);
+
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
+
+// get checkpoint config from the materialized table
+CheckpointConfigInfo checkpointConfigInfo =
+getCheckpointConfigInfo(clusterClient, 
continuousRefreshHandler.getJobId());

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597412205


##
pom.xml:
##
@@ -0,0 +1,120 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.flink
+flink-connector-parent
+1.0.0
+
+
+org.apache.flink
+flink-connector-prometheus-parent
+1.0.0-SNAPSHOT
+pom
+
+Flink Prometheus
+
+prometheus-connector
+amp-request-signer
+example-datastream-job
+
+
+
+
+The Apache Software License, Version 2.0
+https://www.apache.org/licenses/LICENSE-2.0.txt
+repo
+
+
+
+
+UTF-8
+11
+${target.java.version}
+${target.java.version}
+1.17.0
+3.22.2
+5.2.1
+1.12.570
+2.17.1
+
+
+
+
+
+com.amazonaws
+aws-java-sdk-bom
+${aws.sdkv1.version}

Review Comment:
   Correction: it is still required. But I am moving any reference to AWS SDK 
dependency to the AMP Signer module.
   
   This is used to sign the AMP request only. There is no working example using 
SDK2 in Java. We managed to make it work with SDKv1. I'd suggest to keep it 
as-is for the moment and consider upgrading the signer in a future version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-05-11 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1597410882


##
pom.xml:
##
@@ -0,0 +1,120 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.flink
+flink-connector-parent
+1.0.0
+
+
+org.apache.flink
+flink-connector-prometheus-parent
+1.0.0-SNAPSHOT
+pom
+
+Flink Prometheus
+
+prometheus-connector
+amp-request-signer
+example-datastream-job
+
+
+
+
+The Apache Software License, Version 2.0
+https://www.apache.org/licenses/LICENSE-2.0.txt
+repo
+
+
+
+
+UTF-8
+11
+${target.java.version}
+${target.java.version}
+1.17.0
+3.22.2
+5.2.1
+1.12.570
+2.17.1
+
+
+
+
+
+com.amazonaws
+aws-java-sdk-bom
+${aws.sdkv1.version}

Review Comment:
   This is actually no longer needed. The AMP request signer no longer uses any 
SDK
   I am removing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]

2024-05-11 Thread via GitHub


1996fanrui commented on PR #24685:
URL: https://github.com/apache/flink/pull/24685#issuecomment-2105650954

   > @1996fanrui PTAL
   
   Thanks @GOODBOY008 for the ping, this PR is huge, I try to review it next 
week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]

2024-05-11 Thread via GitHub


1996fanrui commented on PR #24670:
URL: https://github.com/apache/flink/pull/24670#issuecomment-2105650755

   > @Jiabao-Sun @1996fanrui I will open hotfix to mater.
   
   Sorry, I didn't notice it. I start fix it after I received the CI alert. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]

2024-05-11 Thread via GitHub


GOODBOY008 commented on PR #24685:
URL: https://github.com/apache/flink/pull/24685#issuecomment-2105650442

   @1996fanrui PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][test] Fix InstantiationUtilTest cannot find assertFalse [flink]

2024-05-11 Thread via GitHub


flinkbot commented on PR #24772:
URL: https://github.com/apache/flink/pull/24772#issuecomment-2105650048

   
   ## CI report:
   
   * 5a8d66f2dfcff0d756fa9d77285f135dadc00ab2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][test] Fix InstantiationUtilTest cannot find assertFalse [flink]

2024-05-11 Thread via GitHub


1996fanrui opened a new pull request, #24772:
URL: https://github.com/apache/flink/pull/24772

   ## What is the purpose of the change
   
   The master branch cannot build success.
   
   https://github.com/apache/flink/pull/24670 doesn't rebase the master branch 
before merging, and other PR added 
`InstantiationUtilTest#testHasNullaryConstructorFalse` to use the assertFalse.
   
   But https://github.com/apache/flink/pull/24670 removed the import 
assertFalse, so master branch cannot compile.
   
   
   ## Brief change log
   
   Migrate the assertFalse to assertThat.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35197][table] Support the execution of supsend materialized table in continuous refresh mode [flink]

2024-05-11 Thread via GitHub


hackergin commented on code in PR #24765:
URL: https://github.com/apache/flink/pull/24765#discussion_r1597406989


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -197,6 +220,29 @@ void testCreateMaterializedTableInContinuousMode() throws 
Exception {
 .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
 
assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
 
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+
+ContinuousRefreshHandler continuousRefreshHandler =
+ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+actualMaterializedTable.getSerializedRefreshHandler(),
+getClass().getClassLoader());
+// check the background job is running
+String describeJobDDL =
+String.format("DESCRIBE JOB '%s'", 
continuousRefreshHandler.getJobId());
+OperationHandle describeJobHandle =
+service.executeStatement(sessionHandle, describeJobDDL, -1, 
new Configuration());
+awaitOperationTermination(service, sessionHandle, describeJobHandle);
+List jobResults = fetchAllResults(sessionHandle, 
describeJobHandle);
+
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
+
+// get checkpoint config from the materialized table
+CheckpointConfigInfo checkpointConfigInfo =
+getCheckpointConfigInfo(clusterClient, 
continuousRefreshHandler.getJobId());

Review Comment:
   Currently, I have not found a good way to directly obtain the response of 
type String.
   
   A better way might be to add a get method to CheckpointConfigInfo class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]

2024-05-11 Thread via GitHub


GOODBOY008 commented on PR #24670:
URL: https://github.com/apache/flink/pull/24670#issuecomment-2105648555

   
[workflow_dispatch](https://github.com/apache/flink/actions/runs/9042536161/job/24849180888#step:6:903)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]

2024-05-11 Thread via GitHub


GOODBOY008 commented on PR #24670:
URL: https://github.com/apache/flink/pull/24670#issuecomment-2105648441

   @Jiabao-Sun @1996fanrui I will open hotfix to mater.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35197][table] Support the execution of supsend materialized table in continuous refresh mode [flink]

2024-05-11 Thread via GitHub


hackergin commented on code in PR #24765:
URL: https://github.com/apache/flink/pull/24765#discussion_r1597406641


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -236,6 +282,150 @@ void testCreateMaterializedTableInFullMode() {
 "Only support create materialized table in continuous 
refresh mode currently.");
 }
 
+@Test
+void testAlterMaterializedTableSuspendAndResume(
+@TempDir Path temporaryPath,
+@InjectClusterClient RestClusterClient restClusterClient)
+throws Exception {
+// initialize session handle, create test-filesystem catalog and 
register it to catalog
+// store
+SessionHandle sessionHandle = initializeSession();
+
+String materializedTableDDL =
+"CREATE MATERIALIZED TABLE users_shops"
++ " PARTITIONED BY (ds)\n"
++ " WITH(\n"
++ "   'format' = 'debezium-json'\n"
++ " )\n"
++ " FRESHNESS = INTERVAL '30' SECOND\n"
++ " AS SELECT \n"
++ "  user_id,\n"
++ "  shop_id,\n"
++ "  ds,\n"
++ "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
++ "  SUM (1) AS pv\n"
++ " FROM (\n"
++ "SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, '-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
++ " ) AS tmp\n"
++ " GROUP BY (user_id, shop_id, ds)";
+
+OperationHandle materializedTableHandle =
+service.executeStatement(
+sessionHandle, materializedTableDDL, -1, new 
Configuration());
+awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+// set up savepoint dir
+String savepointDir = temporaryPath.toString();
+String alterMaterializedTableSavepointDDL =
+String.format("SET 'state.savepoints.dir' = 'file://%s'", 
savepointDir);

Review Comment:
   This must be set, otherwise the following error will be reported.
   ```
   Caused by: java.lang.IllegalArgumentException: The scheme (hdfs://, file://, 
etc) is null. Please specify the file system scheme explicitly in the URI.
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25537) [JUnit5 Migration] Module: flink-core

2024-05-11 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845561#comment-17845561
 ] 

Rui Fan commented on FLINK-25537:
-

Merged to master(1.20.0) via: 5af9acba5b9fbcdf9aadf62310cd337d508158c3

> [JUnit5 Migration] Module: flink-core
> -
>
> Key: FLINK-25537
> URL: https://issues.apache.org/jira/browse/FLINK-25537
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Core
>Reporter: Qingsheng Ren
>Assignee: Aiden Gong
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: util [flink]

2024-05-11 Thread via GitHub


1996fanrui merged PR #24670:
URL: https://github.com/apache/flink/pull/24670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]

2024-05-11 Thread via GitHub


1996fanrui commented on PR #24770:
URL: https://github.com/apache/flink/pull/24770#issuecomment-2105643430

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34615]Split `ExternalizedCheckpointCleanup` out of `Checkpoint… [flink]

2024-05-11 Thread via GitHub


spoon-lz commented on PR #24461:
URL: https://github.com/apache/flink/pull/24461#issuecomment-2105643136

   @flinkbot  run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-11 Thread via GitHub


ysmintor commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105638144

   Yes, I agree this refactoring. And is there any plan to add support Oracle, 
PostgreSQL, OceanBase, etc into pipeline connectors ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33892][runtime] Support Job Recovery from JobMaster Failures for Batch Jobs. [flink]

2024-05-11 Thread via GitHub


flinkbot commented on PR #24771:
URL: https://github.com/apache/flink/pull/24771#issuecomment-2105619645

   
   ## CI report:
   
   * 77f15330f3056aaa256565f7fc800787598a6c6a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33892) FLIP-383: Support Job Recovery from JobMaster Failures for Batch Jobs

2024-05-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33892:
---
Labels: pull-request-available  (was: )

> FLIP-383: Support Job Recovery from JobMaster Failures for Batch Jobs
> -
>
> Key: FLINK-33892
> URL: https://issues.apache.org/jira/browse/FLINK-33892
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: Lijie Wang
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
>
> This is the umbrella ticket for 
> [FLIP-383|https://cwiki.apache.org/confluence/display/FLINK/FLIP-383%3A+Support+Job+Recovery+for+Batch+Jobs]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33892][runtime] Support Job Recovery from JobMaster Failures for Batch Jobs. [flink]

2024-05-11 Thread via GitHub


JunRuiLee opened a new pull request, #24771:
URL: https://github.com/apache/flink/pull/24771

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   Support Job Recovery from JobMaster Failures for Batch Jobs.
   
   
   ## Verifying this change
   
   This change added tests and can be verified by BatchJobRecoveryTest and 
JMFailoverITCase.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597382234


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() {
 "Only support create materialized table in continuous 
refresh mode currently.");
 }
 
+@Test
+void testAlterMaterializedTableRefresh() throws Exception {
+long timeout = Duration.ofSeconds(20).toMillis();
+long pause = Duration.ofSeconds(2).toMillis();
+// initialize session handle, create test-filesystem catalog and 
register it to catalog
+// store
+SessionHandle sessionHandle = initializeSession();
+
+List data = new ArrayList<>();
+data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
+data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+String dataId = TestValuesTableFactory.registerData(data);
+
+String sourceDdl =
+String.format(
+"CREATE TABLE my_source (\n"
++ "  order_id BIGINT,\n"
++ "  user_id BIGINT,\n"
++ "  shop_id BIGINT,\n"
++ "  order_created_at STRING\n"
++ ")\n"
++ "WITH (\n"
++ "  'connector' = 'values',\n"
++ "  'bounded' = 'true',\n"
++ "  'data-id' = '%s'\n"
++ ")",
+dataId);
+service.executeStatement(sessionHandle, sourceDdl, -1, new 
Configuration());
+
+String materializedTableDDL =
+"CREATE MATERIALIZED TABLE my_materialized_table"
++ " PARTITIONED BY (ds)\n"
++ " WITH(\n"
++ "   'format' = 'debezium-json'\n"
++ " )\n"
++ " FRESHNESS = INTERVAL '2' SECOND\n"
++ " AS SELECT \n"
++ "  user_id,\n"
++ "  shop_id,\n"
++ "  ds,\n"
++ "  COUNT(order_id) AS order_cnt\n"
++ " FROM (\n"
++ "SELECT user_id, shop_id, order_created_at AS 
ds, order_id FROM my_source"
++ " ) AS tmp\n"
++ " GROUP BY (user_id, shop_id, ds)";
+
+OperationHandle materializedTableHandle =
+service.executeStatement(
+sessionHandle, materializedTableDDL, -1, new 
Configuration());
+awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+// verify data exists in materialized table
+CommonTestUtils.waitUtil(
+() ->
+fetchTableData(sessionHandle, "SELECT * FROM 
my_materialized_table").size()
+== data.size(),
+Duration.ofMillis(timeout),
+Duration.ofMillis(pause),
+"Failed to verify the data in materialized table.");
+assertThat(
+fetchTableData(
+sessionHandle,
+"SELECT * FROM my_materialized_table 
where ds = '2024-01-02'")
+.size())
+.isEqualTo(2);
+
+// remove the last element
+data.remove(2);
+
+long currentTime = System.currentTimeMillis();
+String alterStatement =
+"ALTER MATERIALIZED TABLE my_materialized_table REFRESH 
PARTITION (ds = '2024-01-02')";
+OperationHandle alterHandle =
+service.executeStatement(sessionHandle, alterStatement, -1, 
new Configuration());
+awaitOperationTermination(service, sessionHandle, alterHandle);
+List result = fetchAllResults(service, sessionHandle, 
alterHandle);
+assertThat(result.size()).isEqualTo(1);
+String jobId = result.get(0).getString(0).toString();
+
+MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster();

Review Comment:
   Got it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597376170


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-11 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+

Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-11 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2105596048

   Thanks @aiwenmo for reviewing, addressed review comments above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]

2024-05-11 Thread via GitHub


yuxiqian commented on PR #3314:
URL: https://github.com/apache/flink-cdc/pull/3314#issuecomment-2105593186

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]

2024-05-11 Thread via GitHub


yuxiqian opened a new pull request, #3314:
URL: https://github.com/apache/flink-cdc/pull/3314

   This PR fixes dead links brought by #3310.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35313][mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]

2024-05-11 Thread via GitHub


yeezychao commented on PR #1907:
URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522

   > > @yuxiqian Turning on upsert mode still fails to filter -u data. I am 
very confused as to why the same PR application failed to test in cdc 3.2 
(flink 1.18) version, but it still works in version 2.2 (flink 1.15). 
Unfortunately, I have not found the reason yet. 
![image](https://private-user-images.githubusercontent.com/18387619/329751040-32609ad6-9730-4bf5-83c6-91f1343dc617.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTU0MDA1MjAsIm5iZiI6MTcxNTQwMDIyMCwicGF0aCI6Ii8xODM4NzYxOS8zMjk3NTEwNDAtMzI2MDlhZDYtOTczMC00YmY1LTgzYzYtOTFmMTM0M2RjNjE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA1MTElMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNTExVDA0MDM0MFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTI1NjQzNjYzN2I2YTU3NzAwYjA0ZDkzODIxMTU0ZTc2ZjQ1YmFiYmZmOTViNzk1MDNmNzBmNDkzYjIwMmIwNGUmWC1BbXotU2lnbmVkSGVhZG
 
Vycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.kfPitV5gbKlL5u6DiGPoxjmPCZUsBpg62S5dTlOigrs)
   > 
   > @yeezychao Maybe check the output log and confirm if MySQL source actually 
sends any `-U` events to downstream? IIRC Flink will automatically append a 
`ChangelogNormalize` node to backfill missing update before events if source 
doesn't provide it.
   
   You are right!The `ChangelogNormalize` node is indeed added under the Flink 
1.18 version,but the Flink 1.15 version haven't.
   
![image](https://github.com/apache/flink-cdc/assets/18387619/f25a44fe-df3a-43d6-b48e-98fc1be487d8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28867) Parquet reader support nested type in array/map type

2024-05-11 Thread Xingcan Cui (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845539#comment-17845539
 ] 

Xingcan Cui commented on FLINK-28867:
-

Hey [~jark], any plan to improve this in the near future? I feel that this is a 
blocker for Flink OLAP despite the data lake projects having their data 
readers/writers. Sometimes users would like to use Flink to process some raw 
parquet files.

> Parquet reader support nested type in array/map type
> 
>
> Key: FLINK-28867
> URL: https://issues.apache.org/jira/browse/FLINK-28867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: dalongliu
>Priority: Major
> Attachments: ReadParquetArray1.java, part-00121.parquet
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-11 Thread via GitHub


yuxiqian commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105586766

   @leonardBang Seems I missed some `{{ref}}` hyperlinks. Will fix it first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-11 Thread via GitHub


leonardBang commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105584829

   @yuxiqian Will appreciate if you can also open PR for release-3.0 and 
release-3.1 branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-11 Thread via GitHub


leonardBang merged PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org