Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-22 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1401846952


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, 
long to) {
  * @param data The array of elements to create the data stream from.
  * @param  The type of the returned data stream
  * @return The data stream representing the given array of elements
+ * @deprecated Use {@link #fromData(OUT...)} instead

Review Comment:
   Sure, but the formulation has to be tentative. While it remains my goal, you 
probably remember this long thread where the item was dropped as a must-have 
for 2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m
   
   How about this:
   ```
   This method will be removed a future release, possibly as early as version 
2.0.  
   Use {@link #fromData(OUT...)} instead.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-22 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1401846952


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, 
long to) {
  * @param data The array of elements to create the data stream from.
  * @param  The type of the returned data stream
  * @return The data stream representing the given array of elements
+ * @deprecated Use {@link #fromData(OUT...)} instead

Review Comment:
   Sure, but the formulation has to tentative. While it remains my goal, you 
probably remember this long thread where the item was dropped as a must-have 
for 2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m
   
   How about this:
   ```
   This method will be removed a future release, possibly as early as version 
2.0.  
   Use {@link #fromData(OUT...)} instead.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-22 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1401846952


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, 
long to) {
  * @param data The array of elements to create the data stream from.
  * @param  The type of the returned data stream
  * @return The data stream representing the given array of elements
+ * @deprecated Use {@link #fromData(OUT...)} instead

Review Comment:
   Sure, but the formulation has to tentative. While it remains my goal, you 
probably remember this long thread were the item was dropped as a must-have for 
2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m
   
   How about this:
   ```
   This method will be removed a future release, possibly as early as version 
2.0.  Use {@link #fromData(OUT...)} instead.
   ```



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, 
long to) {
  * @param data The array of elements to create the data stream from.
  * @param  The type of the returned data stream
  * @return The data stream representing the given array of elements
+ * @deprecated Use {@link #fromData(OUT...)} instead

Review Comment:
   Sure, but the formulation has to tentative. While it remains my goal, you 
probably remember this long thread where the item was dropped as a must-have 
for 2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m
   
   How about this:
   ```
   This method will be removed a future release, possibly as early as version 
2.0.  Use {@link #fromData(OUT...)} instead.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-22 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1401846952


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, 
long to) {
  * @param data The array of elements to create the data stream from.
  * @param  The type of the returned data stream
  * @return The data stream representing the given array of elements
+ * @deprecated Use {@link #fromData(OUT...)} instead

Review Comment:
   Sure, but the formulation has to tentative. While this is still my goal, you 
probably remember this long thread were the item was dropped as a must-have for 
2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m
   
   How about this:
   ```
   This method will be removed a future release, possibly as early as version 
2.0.  Use {@link #fromData(OUT...)} instead.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-22 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1401768536


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, 
long to) {
  * @param data The array of elements to create the data stream from.
  * @param  The type of the returned data stream
  * @return The data stream representing the given array of elements
+ * @deprecated Use {@link #fromData(OUT...)} instead

Review Comment:
   Maybe be explicit about a scheduled removal in 2.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-20 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1820017623

   @zentol CI is green again after the discussed API changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-18 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1397712701


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private transient Iterable elements;
+private transient DataInputView input;
+
+@SafeVarargs
+public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... 
elements) {
+this(typeInfo, new ExecutionConfig(), Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(
+TypeInformation typeInfo, ExecutionConfig config, 
Iterable elements) {
+// must not have null elements and mixed elements
+checkIterable(elements, typeInfo.getTypeClass());
+this.serializer = typeInfo.createSerializer(config);

Review Comment:
   Hmm, I don't know. I went through all type classes and the only two that 
actually look into this config are `PojoTypeInfo`
   ```
   public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.isForceKryoEnabled()) {
   return new KryoSerializer<>(getTypeClass(), config);
   }
   
   if (config.isForceAvroEnabled()) {
   return 
AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
   }
   
   return createPojoSerializer(config);
   }
   ```
   and `GenericTypeInfo`:
   ```
   public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.hasGenericTypesDisabled()) {
   throw new UnsupportedOperationException(
   "Generic types have been disabled in the ExecutionConfig 
and type "
   + this.typeClass.getName()
   + " is treated as a generic type.");
   }
   
   return new KryoSerializer(this.typeClass, config);
   }
   ```
   The rest just plainly ignore it. So it is only needed when enforcing Kryo 
and Avro. For Avro - there is a clear way now to just submit the 

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-18 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1397712701


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private transient Iterable elements;
+private transient DataInputView input;
+
+@SafeVarargs
+public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... 
elements) {
+this(typeInfo, new ExecutionConfig(), Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(
+TypeInformation typeInfo, ExecutionConfig config, 
Iterable elements) {
+// must not have null elements and mixed elements
+checkIterable(elements, typeInfo.getTypeClass());
+this.serializer = typeInfo.createSerializer(config);

Review Comment:
   Hmm, I don't know. I went through all type classes and the only two that 
actually look into this config are `PojoTypeInfo`
   ```
   public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.isForceKryoEnabled()) {
   return new KryoSerializer<>(getTypeClass(), config);
   }
   
   if (config.isForceAvroEnabled()) {
   return 
AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
   }
   
   return createPojoSerializer(config);
   }
   ```
   and `GenericTypeInfo`:
   ```
   public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.hasGenericTypesDisabled()) {
   throw new UnsupportedOperationException(
   "Generic types have been disabled in the ExecutionConfig 
and type "
   + this.typeClass.getName()
   + " is treated as a generic type.");
   }
   
   return new KryoSerializer(this.typeClass, config);
   }
   ```
   The rest just plainly ignore it. So it is only needed when enforcing Kryo 
and Avro. For Avro - there is a clear way now to just submit the 

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-17 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1397712701


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private transient Iterable elements;
+private transient DataInputView input;
+
+@SafeVarargs
+public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... 
elements) {
+this(typeInfo, new ExecutionConfig(), Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(
+TypeInformation typeInfo, ExecutionConfig config, 
Iterable elements) {
+// must not have null elements and mixed elements
+checkIterable(elements, typeInfo.getTypeClass());
+this.serializer = typeInfo.createSerializer(config);

Review Comment:
   Hmm, I don't know. I went through all type classes and the only two that 
actually look into this config are `PojoTypeInfo`
   ```
   public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.isForceKryoEnabled()) {
   return new KryoSerializer<>(getTypeClass(), config);
   }
   
   if (config.isForceAvroEnabled()) {
   return 
AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
   }
   
   return createPojoSerializer(config);
   }
   ```
   and `GenericTypeInfo`:
   ```
   public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.hasGenericTypesDisabled()) {
   throw new UnsupportedOperationException(
   "Generic types have been disabled in the ExecutionConfig 
and type "
   + this.typeClass.getName()
   + " is treated as a generic type.");
   }
   
   return new KryoSerializer(this.typeClass, config);
   }
   ```
   The rest just plainly ignore it. So it is only needed when enforcing Kryo 
and Avro. For Avro - there is a clear way now to just submit the 

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-17 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1397712701


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private transient Iterable elements;
+private transient DataInputView input;
+
+@SafeVarargs
+public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... 
elements) {
+this(typeInfo, new ExecutionConfig(), Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(
+TypeInformation typeInfo, ExecutionConfig config, 
Iterable elements) {
+// must not have null elements and mixed elements
+checkIterable(elements, typeInfo.getTypeClass());
+this.serializer = typeInfo.createSerializer(config);

Review Comment:
   Hmm, I don't know. I went through all type classes and the only two that 
actually look into this config are `PojoTypeInfo`
   ```
   public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.isForceKryoEnabled()) {
   return new KryoSerializer<>(getTypeClass(), config);
   }
   
   if (config.isForceAvroEnabled()) {
   return 
AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
   }
   
   return createPojoSerializer(config);
   }
   ```
   and `GenericTypeInfo`:
   ```
   public TypeSerializer createSerializer(ExecutionConfig config) {
   if (config.hasGenericTypesDisabled()) {
   throw new UnsupportedOperationException(
   "Generic types have been disabled in the ExecutionConfig 
and type "
   + this.typeClass.getName()
   + " is treated as a generic type.");
   }
   
   return new KryoSerializer(this.typeClass, config);
   }
   ```
   The rest just plainly ignore it. So it is only needed when enforcing Kryo 
and Avro. For Avro - there is a clear way now to just submit the 

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-17 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1816655632

   @zentol some thoughts:
   - I like the idea of exposing `fromData`. There is no good reason for both 
`fromElements` and  and `fromCollection` methods to exist - they do the same 
thing and are described in their entirety by the method parameters.
   - Keeping `fromData` on the environment has some benefits:
- Exposing factory methods on the `DataGenerator` for this purpose 
semantically could look a bit weird when all users want is to send some static 
data - it gives a bit of a wrong impression that this data is expected to be 
the basis for some generated dataset, maybe just looped over or something like 
this
- The fact that we use the DataGenerator for under the hood could be 
seen as an implementation detail. Should the modules layout change in 2.0, 
we'll be free to change the underlying implementation if needed.
   
   I am in favor of exposing the `fromData` method and deprecating 
`fromElements` and `fromCollection`. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-15 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1394174857


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private transient Iterable elements;
+private transient DataInputView input;
+
+@SafeVarargs
+public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... 
elements) {
+this(typeInfo, new ExecutionConfig(), Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(
+TypeInformation typeInfo, ExecutionConfig config, 
Iterable elements) {
+// must not have null elements and mixed elements
+checkIterable(elements, typeInfo.getTypeClass());
+this.serializer = typeInfo.createSerializer(config);

Review Comment:
   hmm, wondering if we should fully rely on `setOutputType`, because it has 
the added benefit of ensuring that we see the final ExecutionConfig. 樂 



##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import 

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-15 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1812451001

   > Do you want to cleanup the git history a bit or shall I just squash 
everything?
   
   I'll do it, thanks!
   
   One thing to clarify before we merge - I just noticed yesterday that in a 
somewhat similar situation the decision was made to go via the deprecation 
route instead of the in-place implementation substitution:
   
https://issues.apache.org/jira/browse/FLINK-19494?focusedCommentId=17206787=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17206787
   I guess we do need to consider cases when `fromElements()` might be used in 
production environments to stream in some small bootstrap data sets. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-14 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1810122266

   @zentol CI is green


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-14 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1810121992

   @snuyanzin thanks for the pointer, is seems to be related, albite a somewhat 
different issue. The constructor for which ArchUnit previously added a 
violation was removed, but running it with Java 11 did not remove the violation 
itself:
   
https://github.com/apache/flink/pull/23553/commits/e0b25aed71328dadd3ca6ac868704e8189d1098f#diff-dda3e2858bbd4222b54379ad6d998011d90a8aa8536ecc845284a98001960d7dL67


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-13 Thread via GitHub


snuyanzin commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1809183659

   >Using Java 11 the tests pass, using Java 8 they fail (as in CI).
   >Is this a known issue?
   
   not sure wether this is your issue or not some time ago we've faced this
   https://github.com/TNG/ArchUnit/issues/1124


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-13 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1809168702

   @zentol  I can confirm that the reason for the discrepancy in the local and 
the remote Architecture Tests execution is the Java version. Using Java 11 the 
tests pass, using Java 8 they fail (as in CI). Probably something to do with 
classloading/modules. Is this a known issue? I could not find a matching ticket 
in JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-06 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1794701474

   @zentol thanks for the feedback. The only remaining item currently is the 
[architecture tests 
failures](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54352=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=23304).
 The problem is that running the same command that is executed in Azure (below) 
locally on the same commit does not produce any errors. Enabling store updates 
does not modify any files either. Running `ArchitectureTest` in IntelliJ passes 
(including the the CONNECTOR_CLASSES_ONLY_DEPEND_ON_PUBLIC_API).
   ```
   mvn --no-snapshot-updates -Dscala-2.12 -Pskip-webui-build 
-Dflink.tests.with-openssl -Dflink.tests.check-segment-multiple-free 
-Darchunit.freeze.store.default.allowStoreUpdate=false 
-Dpekko.rpc.force-invocation-serialization -Dflink.hadoop.version=2.10.2 
-Dscala-2.12 -pl 
!flink-annotations,!flink-test-utils-parent/flink-test-utils,!flink-state-backends,!flink-state-backends/flink-statebackend-changelog,!flink-state-backends/flink-statebackend-heap-spillable,!flink-state-backends/flink-statebackend-rocksdb,!flink-clients,!flink-core,!flink-java,!flink-optimizer,!flink-rpc,!flink-rpc/flink-rpc-core,!flink-rpc/flink-rpc-akka,!flink-rpc/flink-rpc-akka-loader,!flink-runtime,!flink-runtime-web,!flink-scala,!flink-streaming-java,!flink-streaming-scala,!flink-metrics,!flink-metrics/flink-metrics-core,!flink-external-resources,!flink-external-resources/flink-external-resource-gpu,!flink-libraries,!flink-libraries/flink-cep,!flink-libraries/flink-cep-scala,!flink-libraries/flink-state-proce
 
ssing-api,!flink-queryable-state,!flink-queryable-state/flink-queryable-state-runtime,!flink-queryable-state/flink-queryable-state-client-java,!flink-container,!flink-dstl,!flink-dstl/flink-dstl-dfs,!,!flink-table,!flink-table/flink-sql-parser,!flink-table/flink-table-common,!flink-table/flink-table-api-java,!flink-table/flink-table-api-scala,!flink-table/flink-table-api-bridge-base,!flink-table/flink-table-api-java-bridge,!flink-table/flink-table-api-scala-bridge,!flink-table/flink-table-api-java-uber,!flink-table/flink-sql-client,!flink-table/flink-sql-gateway-api,!flink-table/flink-sql-gateway,!flink-table/flink-table-planner,!flink-table/flink-table-planner-loader,!flink-table/flink-table-planner-loader-bundle,!flink-table/flink-table-runtime,!flink-table/flink-table-code-splitter,!flink-table/flink-table-test-utils,!,!flink-contrib/flink-connector-wikiedits,!flink-filesystems,!flink-filesystems/flink-azure-fs-hadoop,!flink-filesystems/flink-fs-hadoop-shaded,!flink-filesystems/f
 
link-gs-fs-hadoop,!flink-filesystems/flink-hadoop-fs,!flink-filesystems/flink-oss-fs-hadoop,!flink-filesystems/flink-s3-fs-base,!flink-filesystems/flink-s3-fs-hadoop,!flink-filesystems/flink-s3-fs-presto,!flink-fs-tests,!flink-formats,!flink-formats/flink-format-common,!flink-formats/flink-avro-confluent-registry,!flink-formats/flink-sql-avro-confluent-registry,!flink-formats/flink-avro,!flink-formats/flink-sql-avro,!flink-formats/flink-compress,!flink-formats/flink-hadoop-bulk,!flink-formats/flink-parquet,!flink-formats/flink-sql-parquet,!flink-formats/flink-sequence-file,!flink-formats/flink-json,!flink-formats/flink-csv,!flink-formats/flink-orc,!flink-formats/flink-sql-orc,!flink-formats/flink-orc-nohive,!flink-connectors/flink-file-sink-common,!flink-connectors/flink-hadoop-compatibility,!flink-connectors,!flink-connectors/flink-connector-files,!flink-metrics/flink-metrics-dropwizard,!flink-metrics/flink-metrics-graphite,!flink-metrics/flink-metrics-jmx,!flink-metrics/flink-metr
 
ics-influxdb,!flink-metrics/flink-metrics-prometheus,!flink-metrics/flink-metrics-statsd,!flink-metrics/flink-metrics-datadog,!flink-metrics/flink-metrics-slf4j,!,!flink-connectors/flink-connector-base,!,!flink-tests,!flink-yarn,!flink-yarn-tests,!
 verify
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-05 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1382648870


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-05 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1793845354

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-05 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1382645917


##
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java:
##
@@ -240,7 +240,7 @@ public void testAvroToAvro() {
 }
 
 private DataStream testData(StreamExecutionEnvironment env) {
-return env.fromElements(USER_1, USER_2, USER_3);
+return env.fromElements(USER_1, USER_2, USER_3).setParallelism(1);

Review Comment:
   ✅ 
https://github.com/apache/flink/pull/23553/commits/c04d0dd540c6c0e1635fcf59eb3c10a46cd1e0e5



##
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java:
##
@@ -240,7 +240,7 @@ public void testAvroToAvro() {
 }
 
 private DataStream testData(StreamExecutionEnvironment env) {
-return env.fromElements(USER_1, USER_2, USER_3);
+return env.fromElements(USER_1, USER_2, USER_3).setParallelism(1);

Review Comment:
   ✅ 
https://github.com/apache/flink/pull/23553/commits/c04d0dd540c6c0e1635fcf59eb3c10a46cd1e0e5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-05 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1382643573


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   I believe it is still an opt-in restriction 
https://github.com/apache/flink/pull/22520/files#diff-b9d05c6f5d61d228c20d034270440e8e648f788ceb7ea00622ab7f1947d8908fR108



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-05 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1382643298


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism(
 configuration.set(
 
PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM,
 chainingOfOperatorsWithDifferentMaxParallelismEnabled);
-configuration.set(PipelineOptions.MAX_PARALLELISM, 10);
+configuration.set(PipelineOptions.MAX_PARALLELISM, 1);
 try (StreamExecutionEnvironment chainEnv =
 StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration)) {
 chainEnv.fromElements(1)
 .map(x -> x)
 // should automatically break chain here
 .map(x -> x)
-.setMaxParallelism(1)
+.setMaxParallelism(10)

Review Comment:
   Yep, done 
https://github.com/apache/flink/pull/23553/commits/982bc20522f0a63670e6747f7ede1ae219fb172f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-02 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1380507456


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-02 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1380547823


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-02 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1380524335


##
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java:
##
@@ -240,7 +240,7 @@ public void testAvroToAvro() {
 }
 
 private DataStream testData(StreamExecutionEnvironment env) {
-return env.fromElements(USER_1, USER_2, USER_3);
+return env.fromElements(USER_1, USER_2, USER_3).setParallelism(1);

Review Comment:
   huh. These changes does make one wonder whether we shouldn't set the 
parallelism to 1 by default, but allow the user to override it 樂 



##
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java:
##
@@ -240,7 +240,7 @@ public void testAvroToAvro() {
 }
 
 private DataStream testData(StreamExecutionEnvironment env) {
-return env.fromElements(USER_1, USER_2, USER_3);
+return env.fromElements(USER_1, USER_2, USER_3).setParallelism(1);

Review Comment:
   huh. These changes does make one wonder whether we shouldn't set the 
parallelism to 1 by default, but allow the user to override it 樂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-02 Thread via GitHub


zentol commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1791157635

   > What is the issue with adding the new method?
   
   By default you may not add new methods to classes because it can break 
downstream classes. Personally I'd say it's fine to do it here though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-11-02 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1790570036

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-31 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1787923597

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-26 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1781183357

   @zentol I made two major changes as per our discussions above:
   - 
https://github.com/apache/flink/commit/2712c1813ca6420905e06b9e417de0eb61d586d9 
 - direct type passing without the requirement to use returns() (please see my 
[comment](https://github.com/apache/flink/pull/23553#discussion_r1372403767) 
above)
   - 
https://github.com/apache/flink/pull/23553/commits/78cb92bc86e9dded9bf2458de119d549be7ad281
 - allow parallel execution of fromElements Sources
   
   The second one might need some additional test fixes, but I cannot get to 
them at the moment because of the `japicmp` failures:
   ```
   Failed to execute goal 
io.github.zentol.japicmp:japicmp-maven-plugin:0.17.1.1_m325:cmp (default) on 
project flink-streaming-java: There is at least one incompatibility: 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(org.apache.flink.api.common.typeinfo.TypeInformation,java.lang.Object[]):CLASS_GENERIC_TEMPLATE_CHANGED
 -> [Help 1]
   ```
   This is the diff:
   ```
+++* NEW METHOD: PUBLIC(+) FINAL(+) 
org.apache.flink.streaming.api.datastream.DataStreamSource 
fromElements(org.apache.flink.api.common.typeinfo.TypeInformation, 
java.lang.Object[])
+++  NEW ANNOTATION: java.lang.SafeVarargs
GENERIC TEMPLATES: +++ OUT:java.lang.Object
   ```
   What is the issue with adding the new method?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1372403767


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757585


##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5:
##
@@ -19,8 +19,8 @@ Method 

 calls method 

 in (RecreateOnResetOperatorCoordinator.java:361)
 Method 
 calls method 
 in (TaskManagerConfiguration.java:244)
 Method 
 calls method 

 in (TaskManagerConfiguration.java:246)
-Method 
 calls method 
 in (TaskManagerServices.java:433)
-Method 
 calls method 

 in (TaskManagerServices.java:431)

Review Comment:
   Indeed. I had to enable refreeze to add missing datagen source violations, 
but how exactly it is supposed to work in archunit is still a bit of a mystery 
to me to be honest. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371778188


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism(
 configuration.set(
 
PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM,
 chainingOfOperatorsWithDifferentMaxParallelismEnabled);
-configuration.set(PipelineOptions.MAX_PARALLELISM, 10);
+configuration.set(PipelineOptions.MAX_PARALLELISM, 1);
 try (StreamExecutionEnvironment chainEnv =
 StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration)) {
 chainEnv.fromElements(1)
 .map(x -> x)
 // should automatically break chain here
 .map(x -> x)
-.setMaxParallelism(1)
+.setMaxParallelism(10)

Review Comment:
   I'll unresolve it for now because if our attempt at not restricting 
parallelism to 1 works, this change won't be needed at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371775741


##
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java:
##
@@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws 
Exception {
 })
 .print();
 
-thrown.expect(ProgramInvocationException.class);
-env.execute();
+
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroSpecificRecordsInFromElements() throws Exception {
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+User user1 = new User("Foo", 1);
+User user2 = new User("Bar", 2);
+User[] data = {user1, user2};
+DataStreamSource stream = env.fromElements(User.class, user1, 
user2);
+DataGeneratorSource source = getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroGenericRecordsInFromElements() throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+Schema schema = getSchemaFromResources("/avro/user.avsc");
+GenericRecord user1 =
+new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+GenericRecord user2 =
+new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+GenericRecord[] data = {user1, user2};
+DataStream stream =
+env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+DataGeneratorSource source = 
getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = 
generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+private Schema getSchemaFromResources(String path) throws Exception {
+try (InputStream schemaStream = getClass().getResourceAsStream(path)) {
+if (schemaStream == null) {
+throw new IllegalStateException("Could not find " + path + " 
in classpath");
+}
+return new Schema.Parser().parse(schemaStream);
+}
+}
+
+@SuppressWarnings("unchecked")
+private static > S 
getSourceFromStream(DataStream stream) {
+return (S) ((SourceTransformation) 
stream.getTransformation()).getSource();

Review Comment:
   
https://github.com/apache/flink/pull/23553/commits/2962647e494b569d12bcaf3b4abc314ef0bbc623
 , I also removed the specific record test because without the serializer check 
it is a bit pointless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371769875


##
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java:
##
@@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws 
Exception {
 })
 .print();
 
-thrown.expect(ProgramInvocationException.class);
-env.execute();
+
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroSpecificRecordsInFromElements() throws Exception {
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+User user1 = new User("Foo", 1);
+User user2 = new User("Bar", 2);
+User[] data = {user1, user2};
+DataStreamSource stream = env.fromElements(User.class, user1, 
user2);
+DataGeneratorSource source = getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroGenericRecordsInFromElements() throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+Schema schema = getSchemaFromResources("/avro/user.avsc");
+GenericRecord user1 =
+new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+GenericRecord user2 =
+new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+GenericRecord[] data = {user1, user2};
+DataStream stream =
+env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+DataGeneratorSource source = 
getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = 
generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+private Schema getSchemaFromResources(String path) throws Exception {
+try (InputStream schemaStream = getClass().getResourceAsStream(path)) {
+if (schemaStream == null) {
+throw new IllegalStateException("Could not find " + path + " 
in classpath");
+}
+return new Schema.Parser().parse(schemaStream);
+}
+}
+
+@SuppressWarnings("unchecked")
+private static > S 
getSourceFromStream(DataStream stream) {
+return (S) ((SourceTransformation) 
stream.getTransformation()).getSource();

Review Comment:
   Sounds reasonable. Without explicitly setting `GenericRecordAvroTypeInfo` 
Kryo would fail, so we can just rely on checking the happy path. The downside 
is that this won't allow to check that Avro is detected and the respective 
serializer is used for specific records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371721067


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   OK, the max parallelism setting won't work. The 
`MultiInputNodeCreationProcessor` just makes it decision based on whether the 
source is a FLIP-27 SourceTransformation
   
https://github.com/afedulov/flink/blob/53649ef98ef3da237c1c84714d53aedd6725d39f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java#L479



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   OK, the max parallelism setting won't work.`MultiInputNodeCreationProcessor` 
just makes it decision based on whether the source is a FLIP-27 
SourceTransformation
   
https://github.com/afedulov/flink/blob/53649ef98ef3da237c1c84714d53aedd6725d39f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java#L479



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371721067


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   OK, that won't work the `MultiInputNodeCreationProcessor` just makes it 
decision based on whether the source is a FLIP-27 SourceTransformation
   
https://github.com/afedulov/flink/blob/53649ef98ef3da237c1c84714d53aedd6725d39f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java#L479



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371718910


##
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java:
##
@@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws 
Exception {
 })
 .print();
 
-thrown.expect(ProgramInvocationException.class);
-env.execute();
+
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroSpecificRecordsInFromElements() throws Exception {
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+User user1 = new User("Foo", 1);
+User user2 = new User("Bar", 2);
+User[] data = {user1, user2};
+DataStreamSource stream = env.fromElements(User.class, user1, 
user2);
+DataGeneratorSource source = getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroGenericRecordsInFromElements() throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+Schema schema = getSchemaFromResources("/avro/user.avsc");
+GenericRecord user1 =
+new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+GenericRecord user2 =
+new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+GenericRecord[] data = {user1, user2};
+DataStream stream =
+env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+DataGeneratorSource source = 
getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = 
generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+private Schema getSchemaFromResources(String path) throws Exception {
+try (InputStream schemaStream = getClass().getResourceAsStream(path)) {
+if (schemaStream == null) {
+throw new IllegalStateException("Could not find " + path + " 
in classpath");
+}
+return new Schema.Parser().parse(schemaStream);
+}
+}
+
+@SuppressWarnings("unchecked")
+private static > S 
getSourceFromStream(DataStream stream) {
+return (S) ((SourceTransformation) 
stream.getTransformation()).getSource();

Review Comment:
   ah I see. It feels a bit weird to break the abstraction layers like this; 
they are 3 casts being done here.
   
   A higher level test that just uses avro + fromElements and verifies that 
things _works_ seems more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371715012


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism(
 configuration.set(
 
PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM,
 chainingOfOperatorsWithDifferentMaxParallelismEnabled);
-configuration.set(PipelineOptions.MAX_PARALLELISM, 10);
+configuration.set(PipelineOptions.MAX_PARALLELISM, 1);
 try (StreamExecutionEnvironment chainEnv =
 StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration)) {
 chainEnv.fromElements(1)
 .map(x -> x)
 // should automatically break chain here
 .map(x -> x)
-.setMaxParallelism(1)
+.setMaxParallelism(10)

Review Comment:
   sounds good  



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism(
 configuration.set(
 
PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM,
 chainingOfOperatorsWithDifferentMaxParallelismEnabled);
-configuration.set(PipelineOptions.MAX_PARALLELISM, 10);
+configuration.set(PipelineOptions.MAX_PARALLELISM, 1);
 try (StreamExecutionEnvironment chainEnv =
 StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration)) {
 chainEnv.fromElements(1)
 .map(x -> x)
 // should automatically break chain here
 .map(x -> x)
-.setMaxParallelism(1)
+.setMaxParallelism(10)

Review Comment:
   sounds good  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371714068


##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5:
##
@@ -19,8 +19,8 @@ Method 

 calls method 

 in (RecreateOnResetOperatorCoordinator.java:361)
 Method 
 calls method 
 in (TaskManagerConfiguration.java:244)
 Method 
 calls method 

 in (TaskManagerConfiguration.java:246)
-Method 
 calls method 
 in (TaskManagerServices.java:433)
-Method 
 calls method 

 in (TaskManagerServices.java:431)

Review Comment:
   The line numbers have been tripping people up a few times, maybe we should 
start failing CI if the diff contains changes 樂 
   Anyhow, for the PR it's fine I suppose. _Maybe_ move these lines into a 
separate "regenerate archunit store" commit or something.



##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5:
##
@@ -19,8 +19,8 @@ Method 

 calls method 

 in (RecreateOnResetOperatorCoordinator.java:361)
 Method 
 calls method 
 in (TaskManagerConfiguration.java:244)
 Method 
 calls method 

 in (TaskManagerConfiguration.java:246)
-Method 
 calls method 
 in (TaskManagerServices.java:433)
-Method 
 calls method 

 in (TaskManagerServices.java:431)

Review Comment:
   The line numbers have been tripping people up a few times, maybe we should 
start failing CI if the diff contains changes 樂 
   Anyhow, for the PR it's fine I suppose. _Maybe_ move these lines into a 
separate "regenerate archunit store" commit or something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371711944


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371708873


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371577209


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371571177


##
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java:
##
@@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws 
Exception {
 })
 .print();
 
-thrown.expect(ProgramInvocationException.class);
-env.execute();
+
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroSpecificRecordsInFromElements() throws Exception {
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+User user1 = new User("Foo", 1);
+User user2 = new User("Bar", 2);
+User[] data = {user1, user2};
+DataStreamSource stream = env.fromElements(User.class, user1, 
user2);
+DataGeneratorSource source = getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroGenericRecordsInFromElements() throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+Schema schema = getSchemaFromResources("/avro/user.avsc");
+GenericRecord user1 =
+new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+GenericRecord user2 =
+new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+GenericRecord[] data = {user1, user2};
+DataStream stream =
+env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+DataGeneratorSource source = 
getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = 
generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+private Schema getSchemaFromResources(String path) throws Exception {
+try (InputStream schemaStream = getClass().getResourceAsStream(path)) {
+if (schemaStream == null) {
+throw new IllegalStateException("Could not find " + path + " 
in classpath");
+}
+return new Schema.Parser().parse(schemaStream);
+}
+}
+
+@SuppressWarnings("unchecked")
+private static > S 
getSourceFromStream(DataStream stream) {
+return (S) ((SourceTransformation) 
stream.getTransformation()).getSource();

Review Comment:
   Missed this one. 
   `fromElements` returns `DataStreamSource` (`SingleOutputStreamOperator`), 
not the FLIP-27 Source that I can cast to `DataGeneratorSource` to get a hold 
of the `FromElementsGeneratorFunction`. Or do you have something completely 
different in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371506767


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   Hmm, maybe we should just set the different max parallelism, as in the test 
above 樂 , I'll give it a try.



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   Hmm, maybe we should just set the different max parallelism, as in the test 
above 樂 , I'll give it a try.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371398388


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.

Review Comment:
   > were any specific additional considerations that required to limit it to 
the parallelism of 1.
   
   I can't think of any beyond "hey this would need more work". 樂 



##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-25 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1371395994


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1778137755

   @zentol thanks a lot for the review!! 
   I addressed all comments from your first pass, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370875073


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370869459


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java:
##
@@ -27,7 +27,10 @@
  * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
  * cases where the output type is specified by the returns method and, thus, 
after the stream
  * operator has been created.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead
  */
+@Deprecated

Review Comment:
   Nice trick :) Done 
https://github.com/apache/flink/pull/23553/commits/e0f20410d65ce9ddf5c674e54d89947d5c5ceca3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370869459


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java:
##
@@ -27,7 +27,10 @@
  * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. 
This can be useful for
  * cases where the output type is specified by the returns method and, thus, 
after the stream
  * operator has been created.
+ *
+ * @deprecated Use {@link 
org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead
  */
+@Deprecated

Review Comment:
   Nice trick :) Done [`e0f2041` 
(#23553)](https://github.com/apache/flink/pull/23553/commits/e0f20410d65ce9ddf5c674e54d89947d5c5ceca3)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##
@@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() {
 int maxParallelism = 42;
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-DataStream input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
-DataStream input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
+DataStream input1 = env.fromSequence(1, 
4).setMaxParallelism(128);
+DataStream input2 = env.fromSequence(1, 
4).setMaxParallelism(129);

Review Comment:
   Yes, this fixes test failures that arise because of this: 
https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307
   the SingleOutputStreamOperator caps max parallelism to 1. 
   The current implementation of `fromSequence`, somewhat inconsistently, 
allows parallel execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757728


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.

Review Comment:
   >Theoretically we could also fix this (in a follow-up!) but it doesn't seem 
worth the overhead given the number of elements. Oh I think this already works; 
see below comment.
   
   Good point, I mainly decided to restrict it because this is what the current 
underlying SourceFunction was delivering (FromElementsFunction). I am not sure 
if there were any specific additional considerations that required to limit it 
to the parallelism of 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370845998


##
flink-tests/pom.xml:
##
@@ -284,6 +284,13 @@ under the License.
test

 
+   
+   org.apache.flink
+   flink-avro

Review Comment:
   > I cant really tell how this (and some of the flink-tests changes) related 
to fromElements.
   
   That's actually an easy one. You might remember this - 
https://issues.apache.org/jira/browse/FLINK-21386 . I added the test to 
explicitly verify that the issue you ran into with Avro and `fromElements` is 
handled correctly after the addition of the `OutputTypeConfigurable` 
functionality to `DataGeneratorSource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370845998


##
flink-tests/pom.xml:
##
@@ -284,6 +284,13 @@ under the License.
test

 
+   
+   org.apache.flink
+   flink-avro

Review Comment:
   > I cant really tell how this (and some of the flink-tests changes) related 
to fromElements.
   
   That's actually an easy one. You might remember 
thttps://issues.apache.org/jira/browse/FLINK-21386 . I added the test to 
explicitly verify that the issue you ran into with Avro and `fromElements` is 
handled correctly after the addition of the `OutputTypeConfigurable` 
functionality to `DataGeneratorSource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370841305


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   The FLIP-27 source gets chained, while the test requires a non chainable 
stream (compare with `createChainableStream`).  In this PR I dealt with it by 
switching to fromCollection which is still based on the `SourceFunction`. In 
the follow-up PR for the `fromCollection` migration I had to add a legacy 
source:
   
https://github.com/apache/flink/pull/23558/files#diff-0e02bf442f990b526e7a5fe5203eff9e0d19924419b63d0bb0aa573f2b55R119
   Not sure if we can get a `nonChainableStream` with a FLIP-27 source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370841305


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   The FLIP-27 source gets chained, while the test requires a non chainable 
stream (compare with `createChainableStream`).  In this PR I dealt with it by 
switching to fromCollection that is still based on the `SourceFunction`. In the 
follow-up PR for the `fromCollection` migration I had to add a legacy source:
   
https://github.com/apache/flink/pull/23558/files#diff-0e02bf442f990b526e7a5fe5203eff9e0d19924419b63d0bb0aa573f2b55R119
   Not sure if we can get a `nonChainableStream` with a FLIP-27 source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370832543


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism(
 configuration.set(
 
PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM,
 chainingOfOperatorsWithDifferentMaxParallelismEnabled);
-configuration.set(PipelineOptions.MAX_PARALLELISM, 10);
+configuration.set(PipelineOptions.MAX_PARALLELISM, 1);
 try (StreamExecutionEnvironment chainEnv =
 StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration)) {
 chainEnv.fromElements(1)
 .map(x -> x)
 // should automatically break chain here
 .map(x -> x)
-.setMaxParallelism(1)
+.setMaxParallelism(10)

Review Comment:
   The verifies that the chain gets broken. The legacy source was not enforcing 
max parallelism set to 1, something that we do now by propagating the call to 
super 
(https://github.com/afedulov/flink/blob/cf1a29d47a5bb4fb92e98a36934e525d74bae17b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L208).
 Notice that the   default max parallelism in the config also got changed above 
from 10 to 1. So now we start with the source with max parallelism of 1 and 
break the chain because the second map has parallelism of 10. Previously it was 
doing the same, but in "reverse".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370824409


##
flink-formats/flink-parquet/src/test/resources/avro/user.avsc:
##
@@ -1,9 +0,0 @@
-{
-  "namespace": "org.apache.flink.connector.datagen.source.generated",
-  "type": "record",
-  "name": "User",

Review Comment:
   I added it accidentally to the wrong package in one of the earlier commits 
and this [tmp] commit just cleans it up. I got rid of both changes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370765358


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -1161,9 +1163,10 @@ void 
testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
  */
 @Test
 void testYieldingOperatorProperlyChainedOnLegacySources() {
+// TODO: this test can be removed when the legacy SourceFunction API 
gets removed
 StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);
 
-chainEnv.fromElements(1)
+chainEnv.addSource(new LegacySource())

Review Comment:
   Seems so. Has something to do with threading:
   ```
   [FLINK-16219][runtime] Disallow chaining of legacy source and yielding 
operator.
   
   This change allows yielding operators to be eagerly chained whenever 
possible, except after legacy sources.
   Yielding operators do not properly work when processInput is called from 
another thread, but are usually fine in any other chain.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370762519


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370762519


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757864


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.
+ *
+ * @param  The type of elements returned by this function.
+ */
+@Internal
+public class FromElementsGeneratorFunction
+implements GeneratorFunction, OutputTypeConfigurable {
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FromElementsGeneratorFunction.class);
+
+/** The (de)serializer to be used for the data elements. */
+private @Nullable TypeSerializer serializer;
+
+/** The actual data elements, in serialized form. */
+private byte[] elementsSerialized;
+
+/** The number of elements emitted already. */
+private int numElementsEmitted;
+
+private final transient Iterable elements;
+private transient DataInputView input;
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
OUT... elements)
+throws IOException {
+this(serializer, Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(TypeSerializer serializer, 
Iterable elements)
+throws IOException {
+this.serializer = Preconditions.checkNotNull(serializer);
+this.elements = elements;
+serializeElements();
+}
+
+@SafeVarargs
+public FromElementsGeneratorFunction(OUT... elements) {
+this(Arrays.asList(elements));
+}
+
+public FromElementsGeneratorFunction(Iterable elements) {
+this.serializer = null;
+this.elements = elements;
+checkIterable(elements, Object.class);
+}
+
+@VisibleForTesting
+@Nullable
+public TypeSerializer getSerializer() {
+return serializer;
+}
+
+private void serializeElements() throws IOException {
+Preconditions.checkState(serializer != null, "serializer not set");
+LOG.info("Serializing elements using  " + serializer);
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+
+try {
+for (OUT element : elements) {
+serializer.serialize(element, wrapper);
+}
+} catch (Exception e) {
+throw new IOException("Serializing the source elements failed: " + 
e.getMessage(), e);
+}
+this.elementsSerialized = baos.toByteArray();
+}
+
+@Override
+public void open(SourceReaderContext readerContext) throws Exception {
+

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757728


##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * A stream generator function that returns a sequence of elements.
+ *
+ * This generator function serializes the elements using Flink's type 
information. That way, any
+ * object transport using Java serialization will not be affected by the 
serializability of the
+ * elements.
+ *
+ * NOTE: This source has a parallelism of 1.

Review Comment:
   >Theoretically we could also fix this (in a follow-up!) but it doesn't seem 
worth the overhead given the number of elements. Oh I think this already works; 
see below comment.
   
   Good point, I mainly decided to restrict it because this is what the current 
underlying SourceFunction was delivering (FromElementsFunction). I am not sure 
what was the intention to limit it to the parallelism of 1.



##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.OutputTypeConfigurable;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import 

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


afedulov commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##
@@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() {
 int maxParallelism = 42;
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-DataStream input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
-DataStream input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
+DataStream input1 = env.fromSequence(1, 
4).setMaxParallelism(128);
+DataStream input2 = env.fromSequence(1, 
4).setMaxParallelism(129);

Review Comment:
   Yes, this fixes test failures that arise because of this: 
https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307
   the SingleOutputStreamOperator caps max parallelism to 1. 
   Since this PR is already pretty sizable, it seemed appropriate to postpone 
dealing with this when we work on `fromSequence`.



##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5:
##
@@ -19,8 +19,8 @@ Method 

 calls method 

 in (RecreateOnResetOperatorCoordinator.java:361)
 Method 
 calls method 
 in (TaskManagerConfiguration.java:244)
 Method 
 calls method 

 in (TaskManagerConfiguration.java:246)
-Method 
 calls method 
 in (TaskManagerServices.java:433)
-Method 
 calls method 

 in (TaskManagerServices.java:431)

Review Comment:
   Indeed. I had to enable refreeze to add missing datagen source violations, 
but how exactly it is supposed to work it archunit is still a bit of a mystery 
to me to be honest. 



##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e:
##
@@ -64,6 +64,19 @@ Constructor 
(int, 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
 org.apache.flink.connector.base.source.reader.splitreader.SplitReader, 
java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer, 
boolean)> calls method 
 in 
(SplitFetcher.java:97)
 Constructor 
(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue,
 java.util.function.Supplier, org.apache.flink.configuration.Configuration, 
java.util.function.Consumer)> is annotated with 
 in (SplitFetcherManager.java:0)
 Constructor 
(int)>
 calls method  in (FutureCompletingBlockingQueue.java:114)
+Constructor 
(org.apache.flink.api.common.typeutils.TypeSerializer,
 java.lang.Iterable)> calls method 
 in 
(FromElementsGeneratorFunction.java:85)

Review Comment:
   That's what I thought too. As also tracked in the wiki, the DataGen will not 
be externalized:
   
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-24 Thread via GitHub


zentol commented on code in PR #23553:
URL: https://github.com/apache/flink/pull/23553#discussion_r1370102899


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##
@@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() {
 int maxParallelism = 42;
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-DataStream input1 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(128);
-DataStream input2 = env.fromElements(1, 2, 3, 
4).setMaxParallelism(129);
+DataStream input1 = env.fromSequence(1, 
4).setMaxParallelism(128);
+DataStream input2 = env.fromSequence(1, 
4).setMaxParallelism(129);

Review Comment:
   why are these being changed? Not necessarily in this instnace (since 
sequence seems more appropriate here), just curious about all the other changed 
streaming-java tests.



##
flink-formats/flink-parquet/src/test/resources/avro/user.avsc:
##
@@ -1,9 +0,0 @@
-{
-  "namespace": "org.apache.flink.connector.datagen.source.generated",
-  "type": "record",
-  "name": "User",

Review Comment:
   was this unused?



##
flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5:
##
@@ -19,8 +19,8 @@ Method 

 calls method 

 in (RecreateOnResetOperatorCoordinator.java:361)
 Method 
 calls method 
 in (TaskManagerConfiguration.java:244)
 Method 
 calls method 

 in (TaskManagerConfiguration.java:246)
-Method 
 calls method 
 in (TaskManagerServices.java:433)
-Method 
 calls method 

 in (TaskManagerServices.java:431)

Review Comment:
   These seem to be unrelated?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java:
##
@@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) {
 }
 
 private void createNonChainableStream(TableTestUtil util) {
-DataStreamSource dataStream = 
util.getStreamEnv().fromElements(1, 2, 3);
+DataStreamSource dataStream =
+util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3));

Review Comment:
   Here I wonder why we're changing things.



##
flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java:
##
@@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws 
Exception {
 })
 .print();
 
-thrown.expect(ProgramInvocationException.class);
-env.execute();
+
assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroSpecificRecordsInFromElements() throws Exception {
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+User user1 = new User("Foo", 1);
+User user2 = new User("Bar", 2);
+User[] data = {user1, user2};
+DataStreamSource stream = env.fromElements(User.class, user1, 
user2);
+DataGeneratorSource source = getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+@Test
+@SuppressWarnings("unchecked")
+void testAvroGenericRecordsInFromElements() throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+Schema schema = getSchemaFromResources("/avro/user.avsc");
+GenericRecord user1 =
+new GenericRecordBuilder(schema).set("name", "Foo").set("age", 
40).build();
+GenericRecord user2 =
+new GenericRecordBuilder(schema).set("name", "Bar").set("age", 
45).build();
+GenericRecord[] data = {user1, user2};
+DataStream stream =
+env.fromElements(data).returns(new 
GenericRecordAvroTypeInfo(schema));
+DataGeneratorSource source = 
getSourceFromStream(stream);
+FromElementsGeneratorFunction generatorFunction =
+(FromElementsGeneratorFunction) 
source.getGeneratorFunction();
+
+List result = stream.executeAndCollect(data.length + 1);
+TypeSerializer serializer = 
generatorFunction.getSerializer();
+
+assertThat(serializer).isInstanceOf(AvroSerializer.class);
+assertThat(result).containsExactly(data);
+}
+
+private Schema getSchemaFromResources(String path) throws Exception {
+try (InputStream schemaStream = 

Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-20 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1772756912

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-19 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1771760579

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-19 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1771495374

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-19 Thread via GitHub


afedulov commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1771489496

   The addition of the `OutputTypeConfigurable` was required to achieve feature 
parity and compatibility with the existing fromElements API 
(https://issues.apache.org/jira/browse/FLINK-21386). Alternatively, we could 
provide a `fromElements` method that accepts `TypeInformation` directly, rather 
than having to "fix" it with `returns()`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]

2023-10-19 Thread via GitHub


flinkbot commented on PR #23553:
URL: https://github.com/apache/flink/pull/23553#issuecomment-1771102556

   
   ## CI report:
   
   * 227741d560869d8610b9a80e65b9181ffd25bdb6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org