Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1401846952 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, long to) { * @param data The array of elements to create the data stream from. * @param The type of the returned data stream * @return The data stream representing the given array of elements + * @deprecated Use {@link #fromData(OUT...)} instead Review Comment: Sure, but the formulation has to be tentative. While it remains my goal, you probably remember this long thread where the item was dropped as a must-have for 2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m How about this: ``` This method will be removed a future release, possibly as early as version 2.0. Use {@link #fromData(OUT...)} instead. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1401846952 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, long to) { * @param data The array of elements to create the data stream from. * @param The type of the returned data stream * @return The data stream representing the given array of elements + * @deprecated Use {@link #fromData(OUT...)} instead Review Comment: Sure, but the formulation has to tentative. While it remains my goal, you probably remember this long thread where the item was dropped as a must-have for 2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m How about this: ``` This method will be removed a future release, possibly as early as version 2.0. Use {@link #fromData(OUT...)} instead. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1401846952 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, long to) { * @param data The array of elements to create the data stream from. * @param The type of the returned data stream * @return The data stream representing the given array of elements + * @deprecated Use {@link #fromData(OUT...)} instead Review Comment: Sure, but the formulation has to tentative. While it remains my goal, you probably remember this long thread were the item was dropped as a must-have for 2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m How about this: ``` This method will be removed a future release, possibly as early as version 2.0. Use {@link #fromData(OUT...)} instead. ``` ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, long to) { * @param data The array of elements to create the data stream from. * @param The type of the returned data stream * @return The data stream representing the given array of elements + * @deprecated Use {@link #fromData(OUT...)} instead Review Comment: Sure, but the formulation has to tentative. While it remains my goal, you probably remember this long thread where the item was dropped as a must-have for 2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m How about this: ``` This method will be removed a future release, possibly as early as version 2.0. Use {@link #fromData(OUT...)} instead. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1401846952 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, long to) { * @param data The array of elements to create the data stream from. * @param The type of the returned data stream * @return The data stream representing the given array of elements + * @deprecated Use {@link #fromData(OUT...)} instead Review Comment: Sure, but the formulation has to tentative. While this is still my goal, you probably remember this long thread were the item was dropped as a must-have for 2.0 : https://lists.apache.org/thread/r0y9syc6k5nmcxvnd0hj33htdpdj9k6m How about this: ``` This method will be removed a future release, possibly as early as version 2.0. Use {@link #fromData(OUT...)} instead. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1401768536 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -1185,8 +1295,10 @@ public DataStreamSource fromSequence(long from, long to) { * @param data The array of elements to create the data stream from. * @param The type of the returned data stream * @return The data stream representing the given array of elements + * @deprecated Use {@link #fromData(OUT...)} instead Review Comment: Maybe be explicit about a scheduled removal in 2.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1820017623 @zentol CI is green again after the discussed API changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1397712701 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private transient Iterable elements; +private transient DataInputView input; + +@SafeVarargs +public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... elements) { +this(typeInfo, new ExecutionConfig(), Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction( +TypeInformation typeInfo, ExecutionConfig config, Iterable elements) { +// must not have null elements and mixed elements +checkIterable(elements, typeInfo.getTypeClass()); +this.serializer = typeInfo.createSerializer(config); Review Comment: Hmm, I don't know. I went through all type classes and the only two that actually look into this config are `PojoTypeInfo` ``` public TypeSerializer createSerializer(ExecutionConfig config) { if (config.isForceKryoEnabled()) { return new KryoSerializer<>(getTypeClass(), config); } if (config.isForceAvroEnabled()) { return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass()); } return createPojoSerializer(config); } ``` and `GenericTypeInfo`: ``` public TypeSerializer createSerializer(ExecutionConfig config) { if (config.hasGenericTypesDisabled()) { throw new UnsupportedOperationException( "Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName() + " is treated as a generic type."); } return new KryoSerializer(this.typeClass, config); } ``` The rest just plainly ignore it. So it is only needed when enforcing Kryo and Avro. For Avro - there is a clear way now to just submit the
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1397712701 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private transient Iterable elements; +private transient DataInputView input; + +@SafeVarargs +public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... elements) { +this(typeInfo, new ExecutionConfig(), Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction( +TypeInformation typeInfo, ExecutionConfig config, Iterable elements) { +// must not have null elements and mixed elements +checkIterable(elements, typeInfo.getTypeClass()); +this.serializer = typeInfo.createSerializer(config); Review Comment: Hmm, I don't know. I went through all type classes and the only two that actually look into this config are `PojoTypeInfo` ``` public TypeSerializer createSerializer(ExecutionConfig config) { if (config.isForceKryoEnabled()) { return new KryoSerializer<>(getTypeClass(), config); } if (config.isForceAvroEnabled()) { return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass()); } return createPojoSerializer(config); } ``` and `GenericTypeInfo`: ``` public TypeSerializer createSerializer(ExecutionConfig config) { if (config.hasGenericTypesDisabled()) { throw new UnsupportedOperationException( "Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName() + " is treated as a generic type."); } return new KryoSerializer(this.typeClass, config); } ``` The rest just plainly ignore it. So it is only needed when enforcing Kryo and Avro. For Avro - there is a clear way now to just submit the
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1397712701 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private transient Iterable elements; +private transient DataInputView input; + +@SafeVarargs +public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... elements) { +this(typeInfo, new ExecutionConfig(), Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction( +TypeInformation typeInfo, ExecutionConfig config, Iterable elements) { +// must not have null elements and mixed elements +checkIterable(elements, typeInfo.getTypeClass()); +this.serializer = typeInfo.createSerializer(config); Review Comment: Hmm, I don't know. I went through all type classes and the only two that actually look into this config are `PojoTypeInfo` ``` public TypeSerializer createSerializer(ExecutionConfig config) { if (config.isForceKryoEnabled()) { return new KryoSerializer<>(getTypeClass(), config); } if (config.isForceAvroEnabled()) { return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass()); } return createPojoSerializer(config); } ``` and `GenericTypeInfo`: ``` public TypeSerializer createSerializer(ExecutionConfig config) { if (config.hasGenericTypesDisabled()) { throw new UnsupportedOperationException( "Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName() + " is treated as a generic type."); } return new KryoSerializer(this.typeClass, config); } ``` The rest just plainly ignore it. So it is only needed when enforcing Kryo and Avro. For Avro - there is a clear way now to just submit the
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1397712701 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private transient Iterable elements; +private transient DataInputView input; + +@SafeVarargs +public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... elements) { +this(typeInfo, new ExecutionConfig(), Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction( +TypeInformation typeInfo, ExecutionConfig config, Iterable elements) { +// must not have null elements and mixed elements +checkIterable(elements, typeInfo.getTypeClass()); +this.serializer = typeInfo.createSerializer(config); Review Comment: Hmm, I don't know. I went through all type classes and the only two that actually look into this config are `PojoTypeInfo` ``` public TypeSerializer createSerializer(ExecutionConfig config) { if (config.isForceKryoEnabled()) { return new KryoSerializer<>(getTypeClass(), config); } if (config.isForceAvroEnabled()) { return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass()); } return createPojoSerializer(config); } ``` and `GenericTypeInfo`: ``` public TypeSerializer createSerializer(ExecutionConfig config) { if (config.hasGenericTypesDisabled()) { throw new UnsupportedOperationException( "Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName() + " is treated as a generic type."); } return new KryoSerializer(this.typeClass, config); } ``` The rest just plainly ignore it. So it is only needed when enforcing Kryo and Avro. For Avro - there is a clear way now to just submit the
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1816655632 @zentol some thoughts: - I like the idea of exposing `fromData`. There is no good reason for both `fromElements` and and `fromCollection` methods to exist - they do the same thing and are described in their entirety by the method parameters. - Keeping `fromData` on the environment has some benefits: - Exposing factory methods on the `DataGenerator` for this purpose semantically could look a bit weird when all users want is to send some static data - it gives a bit of a wrong impression that this data is expected to be the basis for some generated dataset, maybe just looped over or something like this - The fact that we use the DataGenerator for under the hood could be seen as an implementation detail. Should the modules layout change in 2.0, we'll be free to change the underlying implementation if needed. I am in favor of exposing the `fromData` method and deprecating `fromElements` and `fromCollection`. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1394174857 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private transient Iterable elements; +private transient DataInputView input; + +@SafeVarargs +public FromElementsGeneratorFunction(TypeInformation typeInfo, OUT... elements) { +this(typeInfo, new ExecutionConfig(), Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction( +TypeInformation typeInfo, ExecutionConfig config, Iterable elements) { +// must not have null elements and mixed elements +checkIterable(elements, typeInfo.getTypeClass()); +this.serializer = typeInfo.createSerializer(config); Review Comment: hmm, wondering if we should fully rely on `setOutputType`, because it has the added benefit of ensuring that we see the final ExecutionConfig. 樂 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1812451001 > Do you want to cleanup the git history a bit or shall I just squash everything? I'll do it, thanks! One thing to clarify before we merge - I just noticed yesterday that in a somewhat similar situation the decision was made to go via the deprecation route instead of the in-place implementation substitution: https://issues.apache.org/jira/browse/FLINK-19494?focusedCommentId=17206787=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17206787 I guess we do need to consider cases when `fromElements()` might be used in production environments to stream in some small bootstrap data sets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1810122266 @zentol CI is green -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1810121992 @snuyanzin thanks for the pointer, is seems to be related, albite a somewhat different issue. The constructor for which ArchUnit previously added a violation was removed, but running it with Java 11 did not remove the violation itself: https://github.com/apache/flink/pull/23553/commits/e0b25aed71328dadd3ca6ac868704e8189d1098f#diff-dda3e2858bbd4222b54379ad6d998011d90a8aa8536ecc845284a98001960d7dL67 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
snuyanzin commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1809183659 >Using Java 11 the tests pass, using Java 8 they fail (as in CI). >Is this a known issue? not sure wether this is your issue or not some time ago we've faced this https://github.com/TNG/ArchUnit/issues/1124 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1809168702 @zentol I can confirm that the reason for the discrepancy in the local and the remote Architecture Tests execution is the Java version. Using Java 11 the tests pass, using Java 8 they fail (as in CI). Probably something to do with classloading/modules. Is this a known issue? I could not find a matching ticket in JIRA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1794701474 @zentol thanks for the feedback. The only remaining item currently is the [architecture tests failures](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54352=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=23304). The problem is that running the same command that is executed in Azure (below) locally on the same commit does not produce any errors. Enabling store updates does not modify any files either. Running `ArchitectureTest` in IntelliJ passes (including the the CONNECTOR_CLASSES_ONLY_DEPEND_ON_PUBLIC_API). ``` mvn --no-snapshot-updates -Dscala-2.12 -Pskip-webui-build -Dflink.tests.with-openssl -Dflink.tests.check-segment-multiple-free -Darchunit.freeze.store.default.allowStoreUpdate=false -Dpekko.rpc.force-invocation-serialization -Dflink.hadoop.version=2.10.2 -Dscala-2.12 -pl !flink-annotations,!flink-test-utils-parent/flink-test-utils,!flink-state-backends,!flink-state-backends/flink-statebackend-changelog,!flink-state-backends/flink-statebackend-heap-spillable,!flink-state-backends/flink-statebackend-rocksdb,!flink-clients,!flink-core,!flink-java,!flink-optimizer,!flink-rpc,!flink-rpc/flink-rpc-core,!flink-rpc/flink-rpc-akka,!flink-rpc/flink-rpc-akka-loader,!flink-runtime,!flink-runtime-web,!flink-scala,!flink-streaming-java,!flink-streaming-scala,!flink-metrics,!flink-metrics/flink-metrics-core,!flink-external-resources,!flink-external-resources/flink-external-resource-gpu,!flink-libraries,!flink-libraries/flink-cep,!flink-libraries/flink-cep-scala,!flink-libraries/flink-state-proce ssing-api,!flink-queryable-state,!flink-queryable-state/flink-queryable-state-runtime,!flink-queryable-state/flink-queryable-state-client-java,!flink-container,!flink-dstl,!flink-dstl/flink-dstl-dfs,!,!flink-table,!flink-table/flink-sql-parser,!flink-table/flink-table-common,!flink-table/flink-table-api-java,!flink-table/flink-table-api-scala,!flink-table/flink-table-api-bridge-base,!flink-table/flink-table-api-java-bridge,!flink-table/flink-table-api-scala-bridge,!flink-table/flink-table-api-java-uber,!flink-table/flink-sql-client,!flink-table/flink-sql-gateway-api,!flink-table/flink-sql-gateway,!flink-table/flink-table-planner,!flink-table/flink-table-planner-loader,!flink-table/flink-table-planner-loader-bundle,!flink-table/flink-table-runtime,!flink-table/flink-table-code-splitter,!flink-table/flink-table-test-utils,!,!flink-contrib/flink-connector-wikiedits,!flink-filesystems,!flink-filesystems/flink-azure-fs-hadoop,!flink-filesystems/flink-fs-hadoop-shaded,!flink-filesystems/f link-gs-fs-hadoop,!flink-filesystems/flink-hadoop-fs,!flink-filesystems/flink-oss-fs-hadoop,!flink-filesystems/flink-s3-fs-base,!flink-filesystems/flink-s3-fs-hadoop,!flink-filesystems/flink-s3-fs-presto,!flink-fs-tests,!flink-formats,!flink-formats/flink-format-common,!flink-formats/flink-avro-confluent-registry,!flink-formats/flink-sql-avro-confluent-registry,!flink-formats/flink-avro,!flink-formats/flink-sql-avro,!flink-formats/flink-compress,!flink-formats/flink-hadoop-bulk,!flink-formats/flink-parquet,!flink-formats/flink-sql-parquet,!flink-formats/flink-sequence-file,!flink-formats/flink-json,!flink-formats/flink-csv,!flink-formats/flink-orc,!flink-formats/flink-sql-orc,!flink-formats/flink-orc-nohive,!flink-connectors/flink-file-sink-common,!flink-connectors/flink-hadoop-compatibility,!flink-connectors,!flink-connectors/flink-connector-files,!flink-metrics/flink-metrics-dropwizard,!flink-metrics/flink-metrics-graphite,!flink-metrics/flink-metrics-jmx,!flink-metrics/flink-metr ics-influxdb,!flink-metrics/flink-metrics-prometheus,!flink-metrics/flink-metrics-statsd,!flink-metrics/flink-metrics-datadog,!flink-metrics/flink-metrics-slf4j,!,!flink-connectors/flink-connector-base,!,!flink-tests,!flink-yarn,!flink-yarn-tests,! verify ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1382648870 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1793845354 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1382645917 ## flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java: ## @@ -240,7 +240,7 @@ public void testAvroToAvro() { } private DataStream testData(StreamExecutionEnvironment env) { -return env.fromElements(USER_1, USER_2, USER_3); +return env.fromElements(USER_1, USER_2, USER_3).setParallelism(1); Review Comment: ✅ https://github.com/apache/flink/pull/23553/commits/c04d0dd540c6c0e1635fcf59eb3c10a46cd1e0e5 ## flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java: ## @@ -240,7 +240,7 @@ public void testAvroToAvro() { } private DataStream testData(StreamExecutionEnvironment env) { -return env.fromElements(USER_1, USER_2, USER_3); +return env.fromElements(USER_1, USER_2, USER_3).setParallelism(1); Review Comment: ✅ https://github.com/apache/flink/pull/23553/commits/c04d0dd540c6c0e1635fcf59eb3c10a46cd1e0e5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1382643573 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: I believe it is still an opt-in restriction https://github.com/apache/flink/pull/22520/files#diff-b9d05c6f5d61d228c20d034270440e8e648f788ceb7ea00622ab7f1947d8908fR108 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1382643298 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism( configuration.set( PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM, chainingOfOperatorsWithDifferentMaxParallelismEnabled); -configuration.set(PipelineOptions.MAX_PARALLELISM, 10); +configuration.set(PipelineOptions.MAX_PARALLELISM, 1); try (StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1, configuration)) { chainEnv.fromElements(1) .map(x -> x) // should automatically break chain here .map(x -> x) -.setMaxParallelism(1) +.setMaxParallelism(10) Review Comment: Yep, done https://github.com/apache/flink/pull/23553/commits/982bc20522f0a63670e6747f7ede1ae219fb172f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1380507456 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1380547823 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1380524335 ## flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java: ## @@ -240,7 +240,7 @@ public void testAvroToAvro() { } private DataStream testData(StreamExecutionEnvironment env) { -return env.fromElements(USER_1, USER_2, USER_3); +return env.fromElements(USER_1, USER_2, USER_3).setParallelism(1); Review Comment: huh. These changes does make one wonder whether we shouldn't set the parallelism to 1 by default, but allow the user to override it 樂 ## flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java: ## @@ -240,7 +240,7 @@ public void testAvroToAvro() { } private DataStream testData(StreamExecutionEnvironment env) { -return env.fromElements(USER_1, USER_2, USER_3); +return env.fromElements(USER_1, USER_2, USER_3).setParallelism(1); Review Comment: huh. These changes does make one wonder whether we shouldn't set the parallelism to 1 by default, but allow the user to override it 樂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1791157635 > What is the issue with adding the new method? By default you may not add new methods to classes because it can break downstream classes. Personally I'd say it's fine to do it here though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1790570036 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1787923597 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1781183357 @zentol I made two major changes as per our discussions above: - https://github.com/apache/flink/commit/2712c1813ca6420905e06b9e417de0eb61d586d9 - direct type passing without the requirement to use returns() (please see my [comment](https://github.com/apache/flink/pull/23553#discussion_r1372403767) above) - https://github.com/apache/flink/pull/23553/commits/78cb92bc86e9dded9bf2458de119d549be7ad281 - allow parallel execution of fromElements Sources The second one might need some additional test fixes, but I cannot get to them at the moment because of the `japicmp` failures: ``` Failed to execute goal io.github.zentol.japicmp:japicmp-maven-plugin:0.17.1.1_m325:cmp (default) on project flink-streaming-java: There is at least one incompatibility: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(org.apache.flink.api.common.typeinfo.TypeInformation,java.lang.Object[]):CLASS_GENERIC_TEMPLATE_CHANGED -> [Help 1] ``` This is the diff: ``` +++* NEW METHOD: PUBLIC(+) FINAL(+) org.apache.flink.streaming.api.datastream.DataStreamSource fromElements(org.apache.flink.api.common.typeinfo.TypeInformation, java.lang.Object[]) +++ NEW ANNOTATION: java.lang.SafeVarargs GENERIC TEMPLATES: +++ OUT:java.lang.Object ``` What is the issue with adding the new method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1372403767 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757585 ## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5: ## @@ -19,8 +19,8 @@ Method calls method in (RecreateOnResetOperatorCoordinator.java:361) Method calls method in (TaskManagerConfiguration.java:244) Method calls method in (TaskManagerConfiguration.java:246) -Method calls method in (TaskManagerServices.java:433) -Method calls method in (TaskManagerServices.java:431) Review Comment: Indeed. I had to enable refreeze to add missing datagen source violations, but how exactly it is supposed to work in archunit is still a bit of a mystery to me to be honest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371778188 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism( configuration.set( PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM, chainingOfOperatorsWithDifferentMaxParallelismEnabled); -configuration.set(PipelineOptions.MAX_PARALLELISM, 10); +configuration.set(PipelineOptions.MAX_PARALLELISM, 1); try (StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1, configuration)) { chainEnv.fromElements(1) .map(x -> x) // should automatically break chain here .map(x -> x) -.setMaxParallelism(1) +.setMaxParallelism(10) Review Comment: I'll unresolve it for now because if our attempt at not restricting parallelism to 1 works, this change won't be needed at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371775741 ## flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java: ## @@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws Exception { }) .print(); -thrown.expect(ProgramInvocationException.class); -env.execute(); + assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroSpecificRecordsInFromElements() throws Exception { +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +User user1 = new User("Foo", 1); +User user2 = new User("Bar", 2); +User[] data = {user1, user2}; +DataStreamSource stream = env.fromElements(User.class, user1, user2); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroGenericRecordsInFromElements() throws Exception { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Schema schema = getSchemaFromResources("/avro/user.avsc"); +GenericRecord user1 = +new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); +GenericRecord user2 = +new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); +GenericRecord[] data = {user1, user2}; +DataStream stream = +env.fromElements(data).returns(new GenericRecordAvroTypeInfo(schema)); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +private Schema getSchemaFromResources(String path) throws Exception { +try (InputStream schemaStream = getClass().getResourceAsStream(path)) { +if (schemaStream == null) { +throw new IllegalStateException("Could not find " + path + " in classpath"); +} +return new Schema.Parser().parse(schemaStream); +} +} + +@SuppressWarnings("unchecked") +private static > S getSourceFromStream(DataStream stream) { +return (S) ((SourceTransformation) stream.getTransformation()).getSource(); Review Comment: https://github.com/apache/flink/pull/23553/commits/2962647e494b569d12bcaf3b4abc314ef0bbc623 , I also removed the specific record test because without the serializer check it is a bit pointless. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371769875 ## flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java: ## @@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws Exception { }) .print(); -thrown.expect(ProgramInvocationException.class); -env.execute(); + assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroSpecificRecordsInFromElements() throws Exception { +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +User user1 = new User("Foo", 1); +User user2 = new User("Bar", 2); +User[] data = {user1, user2}; +DataStreamSource stream = env.fromElements(User.class, user1, user2); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroGenericRecordsInFromElements() throws Exception { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Schema schema = getSchemaFromResources("/avro/user.avsc"); +GenericRecord user1 = +new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); +GenericRecord user2 = +new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); +GenericRecord[] data = {user1, user2}; +DataStream stream = +env.fromElements(data).returns(new GenericRecordAvroTypeInfo(schema)); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +private Schema getSchemaFromResources(String path) throws Exception { +try (InputStream schemaStream = getClass().getResourceAsStream(path)) { +if (schemaStream == null) { +throw new IllegalStateException("Could not find " + path + " in classpath"); +} +return new Schema.Parser().parse(schemaStream); +} +} + +@SuppressWarnings("unchecked") +private static > S getSourceFromStream(DataStream stream) { +return (S) ((SourceTransformation) stream.getTransformation()).getSource(); Review Comment: Sounds reasonable. Without explicitly setting `GenericRecordAvroTypeInfo` Kryo would fail, so we can just rely on checking the happy path. The downside is that this won't allow to check that Avro is detected and the respective serializer is used for specific records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371721067 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: OK, the max parallelism setting won't work. The `MultiInputNodeCreationProcessor` just makes it decision based on whether the source is a FLIP-27 SourceTransformation https://github.com/afedulov/flink/blob/53649ef98ef3da237c1c84714d53aedd6725d39f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java#L479 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: OK, the max parallelism setting won't work.`MultiInputNodeCreationProcessor` just makes it decision based on whether the source is a FLIP-27 SourceTransformation https://github.com/afedulov/flink/blob/53649ef98ef3da237c1c84714d53aedd6725d39f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java#L479 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371721067 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: OK, that won't work the `MultiInputNodeCreationProcessor` just makes it decision based on whether the source is a FLIP-27 SourceTransformation https://github.com/afedulov/flink/blob/53649ef98ef3da237c1c84714d53aedd6725d39f/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java#L479 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371718910 ## flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java: ## @@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws Exception { }) .print(); -thrown.expect(ProgramInvocationException.class); -env.execute(); + assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroSpecificRecordsInFromElements() throws Exception { +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +User user1 = new User("Foo", 1); +User user2 = new User("Bar", 2); +User[] data = {user1, user2}; +DataStreamSource stream = env.fromElements(User.class, user1, user2); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroGenericRecordsInFromElements() throws Exception { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Schema schema = getSchemaFromResources("/avro/user.avsc"); +GenericRecord user1 = +new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); +GenericRecord user2 = +new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); +GenericRecord[] data = {user1, user2}; +DataStream stream = +env.fromElements(data).returns(new GenericRecordAvroTypeInfo(schema)); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +private Schema getSchemaFromResources(String path) throws Exception { +try (InputStream schemaStream = getClass().getResourceAsStream(path)) { +if (schemaStream == null) { +throw new IllegalStateException("Could not find " + path + " in classpath"); +} +return new Schema.Parser().parse(schemaStream); +} +} + +@SuppressWarnings("unchecked") +private static > S getSourceFromStream(DataStream stream) { +return (S) ((SourceTransformation) stream.getTransformation()).getSource(); Review Comment: ah I see. It feels a bit weird to break the abstraction layers like this; they are 3 casts being done here. A higher level test that just uses avro + fromElements and verifies that things _works_ seems more appropriate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371715012 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism( configuration.set( PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM, chainingOfOperatorsWithDifferentMaxParallelismEnabled); -configuration.set(PipelineOptions.MAX_PARALLELISM, 10); +configuration.set(PipelineOptions.MAX_PARALLELISM, 1); try (StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1, configuration)) { chainEnv.fromElements(1) .map(x -> x) // should automatically break chain here .map(x -> x) -.setMaxParallelism(1) +.setMaxParallelism(10) Review Comment: sounds good ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism( configuration.set( PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM, chainingOfOperatorsWithDifferentMaxParallelismEnabled); -configuration.set(PipelineOptions.MAX_PARALLELISM, 10); +configuration.set(PipelineOptions.MAX_PARALLELISM, 1); try (StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1, configuration)) { chainEnv.fromElements(1) .map(x -> x) // should automatically break chain here .map(x -> x) -.setMaxParallelism(1) +.setMaxParallelism(10) Review Comment: sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371714068 ## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5: ## @@ -19,8 +19,8 @@ Method calls method in (RecreateOnResetOperatorCoordinator.java:361) Method calls method in (TaskManagerConfiguration.java:244) Method calls method in (TaskManagerConfiguration.java:246) -Method calls method in (TaskManagerServices.java:433) -Method calls method in (TaskManagerServices.java:431) Review Comment: The line numbers have been tripping people up a few times, maybe we should start failing CI if the diff contains changes 樂 Anyhow, for the PR it's fine I suppose. _Maybe_ move these lines into a separate "regenerate archunit store" commit or something. ## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5: ## @@ -19,8 +19,8 @@ Method calls method in (RecreateOnResetOperatorCoordinator.java:361) Method calls method in (TaskManagerConfiguration.java:244) Method calls method in (TaskManagerConfiguration.java:246) -Method calls method in (TaskManagerServices.java:433) -Method calls method in (TaskManagerServices.java:431) Review Comment: The line numbers have been tripping people up a few times, maybe we should start failing CI if the diff contains changes 樂 Anyhow, for the PR it's fine I suppose. _Maybe_ move these lines into a separate "regenerate archunit store" commit or something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371711944 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371708873 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371577209 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371571177 ## flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java: ## @@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws Exception { }) .print(); -thrown.expect(ProgramInvocationException.class); -env.execute(); + assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroSpecificRecordsInFromElements() throws Exception { +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +User user1 = new User("Foo", 1); +User user2 = new User("Bar", 2); +User[] data = {user1, user2}; +DataStreamSource stream = env.fromElements(User.class, user1, user2); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroGenericRecordsInFromElements() throws Exception { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Schema schema = getSchemaFromResources("/avro/user.avsc"); +GenericRecord user1 = +new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); +GenericRecord user2 = +new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); +GenericRecord[] data = {user1, user2}; +DataStream stream = +env.fromElements(data).returns(new GenericRecordAvroTypeInfo(schema)); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +private Schema getSchemaFromResources(String path) throws Exception { +try (InputStream schemaStream = getClass().getResourceAsStream(path)) { +if (schemaStream == null) { +throw new IllegalStateException("Could not find " + path + " in classpath"); +} +return new Schema.Parser().parse(schemaStream); +} +} + +@SuppressWarnings("unchecked") +private static > S getSourceFromStream(DataStream stream) { +return (S) ((SourceTransformation) stream.getTransformation()).getSource(); Review Comment: Missed this one. `fromElements` returns `DataStreamSource` (`SingleOutputStreamOperator`), not the FLIP-27 Source that I can cast to `DataGeneratorSource` to get a hold of the `FromElementsGeneratorFunction`. Or do you have something completely different in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371506767 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: Hmm, maybe we should just set the different max parallelism, as in the test above 樂 , I'll give it a try. ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: Hmm, maybe we should just set the different max parallelism, as in the test above 樂 , I'll give it a try. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371398388 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. Review Comment: > were any specific additional considerations that required to limit it to the parallelism of 1. I can't think of any beyond "hey this would need more work". 樂 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1371395994 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1778137755 @zentol thanks a lot for the review!! I addressed all comments from your first pass, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370875073 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370869459 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java: ## @@ -27,7 +27,10 @@ * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for * cases where the output type is specified by the returns method and, thus, after the stream * operator has been created. + * + * @deprecated Use {@link org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead */ +@Deprecated Review Comment: Nice trick :) Done https://github.com/apache/flink/pull/23553/commits/e0f20410d65ce9ddf5c674e54d89947d5c5ceca3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370869459 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java: ## @@ -27,7 +27,10 @@ * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for * cases where the output type is specified by the returns method and, thus, after the stream * operator has been created. + * + * @deprecated Use {@link org.apache.flink.api.java.typeutils.OutputTypeConfigurable} instead */ +@Deprecated Review Comment: Nice trick :) Done [`e0f2041` (#23553)](https://github.com/apache/flink/pull/23553/commits/e0f20410d65ce9ddf5c674e54d89947d5c5ceca3) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java: ## @@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); -DataStream input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); +DataStream input1 = env.fromSequence(1, 4).setMaxParallelism(128); +DataStream input2 = env.fromSequence(1, 4).setMaxParallelism(129); Review Comment: Yes, this fixes test failures that arise because of this: https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307 the SingleOutputStreamOperator caps max parallelism to 1. The current implementation of `fromSequence`, somewhat inconsistently, allows parallel execution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757728 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. Review Comment: >Theoretically we could also fix this (in a follow-up!) but it doesn't seem worth the overhead given the number of elements. Oh I think this already works; see below comment. Good point, I mainly decided to restrict it because this is what the current underlying SourceFunction was delivering (FromElementsFunction). I am not sure if there were any specific additional considerations that required to limit it to the parallelism of 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370845998 ## flink-tests/pom.xml: ## @@ -284,6 +284,13 @@ under the License. test + + org.apache.flink + flink-avro Review Comment: > I cant really tell how this (and some of the flink-tests changes) related to fromElements. That's actually an easy one. You might remember this - https://issues.apache.org/jira/browse/FLINK-21386 . I added the test to explicitly verify that the issue you ran into with Avro and `fromElements` is handled correctly after the addition of the `OutputTypeConfigurable` functionality to `DataGeneratorSource`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370845998 ## flink-tests/pom.xml: ## @@ -284,6 +284,13 @@ under the License. test + + org.apache.flink + flink-avro Review Comment: > I cant really tell how this (and some of the flink-tests changes) related to fromElements. That's actually an easy one. You might remember thttps://issues.apache.org/jira/browse/FLINK-21386 . I added the test to explicitly verify that the issue you ran into with Avro and `fromElements` is handled correctly after the addition of the `OutputTypeConfigurable` functionality to `DataGeneratorSource`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370841305 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: The FLIP-27 source gets chained, while the test requires a non chainable stream (compare with `createChainableStream`). In this PR I dealt with it by switching to fromCollection which is still based on the `SourceFunction`. In the follow-up PR for the `fromCollection` migration I had to add a legacy source: https://github.com/apache/flink/pull/23558/files#diff-0e02bf442f990b526e7a5fe5203eff9e0d19924419b63d0bb0aa573f2b55R119 Not sure if we can get a `nonChainableStream` with a FLIP-27 source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370841305 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: The FLIP-27 source gets chained, while the test requires a non chainable stream (compare with `createChainableStream`). In this PR I dealt with it by switching to fromCollection that is still based on the `SourceFunction`. In the follow-up PR for the `fromCollection` migration I had to add a legacy source: https://github.com/apache/flink/pull/23558/files#diff-0e02bf442f990b526e7a5fe5203eff9e0d19924419b63d0bb0aa573f2b55R119 Not sure if we can get a `nonChainableStream` with a FLIP-27 source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370832543 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1188,14 +1191,14 @@ void testChainingOfOperatorsWithDifferentMaxParallelism( configuration.set( PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM, chainingOfOperatorsWithDifferentMaxParallelismEnabled); -configuration.set(PipelineOptions.MAX_PARALLELISM, 10); +configuration.set(PipelineOptions.MAX_PARALLELISM, 1); try (StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1, configuration)) { chainEnv.fromElements(1) .map(x -> x) // should automatically break chain here .map(x -> x) -.setMaxParallelism(1) +.setMaxParallelism(10) Review Comment: The verifies that the chain gets broken. The legacy source was not enforcing max parallelism set to 1, something that we do now by propagating the call to super (https://github.com/afedulov/flink/blob/cf1a29d47a5bb4fb92e98a36934e525d74bae17b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L208). Notice that the default max parallelism in the config also got changed above from 10 to 1. So now we start with the source with max parallelism of 1 and break the chain because the second map has parallelism of 10. Previously it was doing the same, but in "reverse". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370824409 ## flink-formats/flink-parquet/src/test/resources/avro/user.avsc: ## @@ -1,9 +0,0 @@ -{ - "namespace": "org.apache.flink.connector.datagen.source.generated", - "type": "record", - "name": "User", Review Comment: I added it accidentally to the wrong package in one of the earlier commits and this [tmp] commit just cleans it up. I got rid of both changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370765358 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -1161,9 +1163,10 @@ void testYieldingOperatorChainableToTaskNotChainedToLegacySource() { */ @Test void testYieldingOperatorProperlyChainedOnLegacySources() { +// TODO: this test can be removed when the legacy SourceFunction API gets removed StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1); -chainEnv.fromElements(1) +chainEnv.addSource(new LegacySource()) Review Comment: Seems so. Has something to do with threading: ``` [FLINK-16219][runtime] Disallow chaining of legacy source and yielding operator. This change allows yielding operators to be eagerly chained whenever possible, except after legacy sources. Yielding operators do not properly work when processInput is called from another thread, but are usually fine in any other chain. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370762519 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370762519 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757864 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. + * + * @param The type of elements returned by this function. + */ +@Internal +public class FromElementsGeneratorFunction +implements GeneratorFunction, OutputTypeConfigurable { + +private static final long serialVersionUID = 1L; + +private static final Logger LOG = LoggerFactory.getLogger(FromElementsGeneratorFunction.class); + +/** The (de)serializer to be used for the data elements. */ +private @Nullable TypeSerializer serializer; + +/** The actual data elements, in serialized form. */ +private byte[] elementsSerialized; + +/** The number of elements emitted already. */ +private int numElementsEmitted; + +private final transient Iterable elements; +private transient DataInputView input; + +public FromElementsGeneratorFunction(TypeSerializer serializer, OUT... elements) +throws IOException { +this(serializer, Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(TypeSerializer serializer, Iterable elements) +throws IOException { +this.serializer = Preconditions.checkNotNull(serializer); +this.elements = elements; +serializeElements(); +} + +@SafeVarargs +public FromElementsGeneratorFunction(OUT... elements) { +this(Arrays.asList(elements)); +} + +public FromElementsGeneratorFunction(Iterable elements) { +this.serializer = null; +this.elements = elements; +checkIterable(elements, Object.class); +} + +@VisibleForTesting +@Nullable +public TypeSerializer getSerializer() { +return serializer; +} + +private void serializeElements() throws IOException { +Preconditions.checkState(serializer != null, "serializer not set"); +LOG.info("Serializing elements using " + serializer); +ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + +try { +for (OUT element : elements) { +serializer.serialize(element, wrapper); +} +} catch (Exception e) { +throw new IOException("Serializing the source elements failed: " + e.getMessage(), e); +} +this.elementsSerialized = baos.toByteArray(); +} + +@Override +public void open(SourceReaderContext readerContext) throws Exception { +
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757728 ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +/** + * A stream generator function that returns a sequence of elements. + * + * This generator function serializes the elements using Flink's type information. That way, any + * object transport using Java serialization will not be affected by the serializability of the + * elements. + * + * NOTE: This source has a parallelism of 1. Review Comment: >Theoretically we could also fix this (in a follow-up!) but it doesn't seem worth the overhead given the number of elements. Oh I think this already works; see below comment. Good point, I mainly decided to restrict it because this is what the current underlying SourceFunction was delivering (FromElementsFunction). I am not sure what was the intention to limit it to the parallelism of 1. ## flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/functions/FromElementsGeneratorFunction.java: ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.datagen.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.OutputTypeConfigurable; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370757489 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java: ## @@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); -DataStream input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); +DataStream input1 = env.fromSequence(1, 4).setMaxParallelism(128); +DataStream input2 = env.fromSequence(1, 4).setMaxParallelism(129); Review Comment: Yes, this fixes test failures that arise because of this: https://github.com/apache/flink/pull/23553/files#diff-4a5eb9032bed78bb9f18e6523d4f7b3dc86ed10e3a3689757c1c4fa2335e7255R1307 the SingleOutputStreamOperator caps max parallelism to 1. Since this PR is already pretty sizable, it seemed appropriate to postpone dealing with this when we work on `fromSequence`. ## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5: ## @@ -19,8 +19,8 @@ Method calls method in (RecreateOnResetOperatorCoordinator.java:361) Method calls method in (TaskManagerConfiguration.java:244) Method calls method in (TaskManagerConfiguration.java:246) -Method calls method in (TaskManagerServices.java:433) -Method calls method in (TaskManagerServices.java:431) Review Comment: Indeed. I had to enable refreeze to add missing datagen source violations, but how exactly it is supposed to work it archunit is still a bit of a mystery to me to be honest. ## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e: ## @@ -64,6 +64,19 @@ Constructor (int, org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, org.apache.flink.connector.base.source.reader.splitreader.SplitReader, java.util.function.Consumer, java.lang.Runnable, java.util.function.Consumer, boolean)> calls method in (SplitFetcher.java:97) Constructor (org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, java.util.function.Supplier, org.apache.flink.configuration.Configuration, java.util.function.Consumer)> is annotated with in (SplitFetcherManager.java:0) Constructor (int)> calls method in (FutureCompletingBlockingQueue.java:114) +Constructor (org.apache.flink.api.common.typeutils.TypeSerializer, java.lang.Iterable)> calls method in (FromElementsGeneratorFunction.java:85) Review Comment: That's what I thought too. As also tracked in the wiki, the DataGen will not be externalized: https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
zentol commented on code in PR #23553: URL: https://github.com/apache/flink/pull/23553#discussion_r1370102899 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java: ## @@ -566,15 +567,15 @@ public void testMaxParallelismWithConnectedKeyedStream() { int maxParallelism = 42; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128); -DataStream input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129); +DataStream input1 = env.fromSequence(1, 4).setMaxParallelism(128); +DataStream input2 = env.fromSequence(1, 4).setMaxParallelism(129); Review Comment: why are these being changed? Not necessarily in this instnace (since sequence seems more appropriate here), just curious about all the other changed streaming-java tests. ## flink-formats/flink-parquet/src/test/resources/avro/user.avsc: ## @@ -1,9 +0,0 @@ -{ - "namespace": "org.apache.flink.connector.datagen.source.generated", - "type": "record", - "name": "User", Review Comment: was this unused? ## flink-architecture-tests/flink-architecture-tests-production/archunit-violations/e5126cae-f3fe-48aa-b6fb-60ae6cc3fcd5: ## @@ -19,8 +19,8 @@ Method calls method in (RecreateOnResetOperatorCoordinator.java:361) Method calls method in (TaskManagerConfiguration.java:244) Method calls method in (TaskManagerConfiguration.java:246) -Method calls method in (TaskManagerServices.java:433) -Method calls method in (TaskManagerServices.java:431) Review Comment: These seem to be unrelated? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java: ## @@ -123,7 +124,8 @@ private void createChainableStream(TableTestUtil util) { } private void createNonChainableStream(TableTestUtil util) { -DataStreamSource dataStream = util.getStreamEnv().fromElements(1, 2, 3); +DataStreamSource dataStream = +util.getStreamEnv().fromCollection(Arrays.asList(1, 2, 3)); Review Comment: Here I wonder why we're changing things. ## flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamExecutionEnvironmentITCase.java: ## @@ -67,7 +79,62 @@ public void executeThrowsProgramInvocationException() throws Exception { }) .print(); -thrown.expect(ProgramInvocationException.class); -env.execute(); + assertThatThrownBy(env::execute).isInstanceOf(ProgramInvocationException.class); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroSpecificRecordsInFromElements() throws Exception { +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +User user1 = new User("Foo", 1); +User user2 = new User("Bar", 2); +User[] data = {user1, user2}; +DataStreamSource stream = env.fromElements(User.class, user1, user2); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +@Test +@SuppressWarnings("unchecked") +void testAvroGenericRecordsInFromElements() throws Exception { +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +Schema schema = getSchemaFromResources("/avro/user.avsc"); +GenericRecord user1 = +new GenericRecordBuilder(schema).set("name", "Foo").set("age", 40).build(); +GenericRecord user2 = +new GenericRecordBuilder(schema).set("name", "Bar").set("age", 45).build(); +GenericRecord[] data = {user1, user2}; +DataStream stream = +env.fromElements(data).returns(new GenericRecordAvroTypeInfo(schema)); +DataGeneratorSource source = getSourceFromStream(stream); +FromElementsGeneratorFunction generatorFunction = +(FromElementsGeneratorFunction) source.getGeneratorFunction(); + +List result = stream.executeAndCollect(data.length + 1); +TypeSerializer serializer = generatorFunction.getSerializer(); + +assertThat(serializer).isInstanceOf(AvroSerializer.class); +assertThat(result).containsExactly(data); +} + +private Schema getSchemaFromResources(String path) throws Exception { +try (InputStream schemaStream =
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1772756912 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1771760579 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1771495374 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
afedulov commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1771489496 The addition of the `OutputTypeConfigurable` was required to achieve feature parity and compatibility with the existing fromElements API (https://issues.apache.org/jira/browse/FLINK-21386). Alternatively, we could provide a `fromElements` method that accepts `TypeInformation` directly, rather than having to "fix" it with `returns()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28050][connectors] Migrate StreamExecutionEnvironment#fromElements() implementation to FLIP-27 Source API [flink]
flinkbot commented on PR #23553: URL: https://github.com/apache/flink/pull/23553#issuecomment-1771102556 ## CI report: * 227741d560869d8610b9a80e65b9181ffd25bdb6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org