[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-05-11 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1191934305


##
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java:
##
@@ -93,4 +93,9 @@ public ChangelogMode getChangelogMode() {
 public void applyLimit(long limit) {
 this.numberOfRows = limit;
 }
+
+@VisibleForTesting
+public DataGenerator[] getFieldGenerators() {

Review Comment:
   Yes, I would suggest that we do that in a separate hotfix commit within this 
PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-05-11 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1190867729


##
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java:
##
@@ -82,25 +81,11 @@ public SequenceGeneratorVisitor(String name, ReadableConfig 
config) {
 ConfigOptions.OptionBuilder startKey = key(startKeyStr);
 ConfigOptions.OptionBuilder endKey = key(endKeyStr);
 
-config.getOptional(startKey.stringType().noDefaultValue())
-.orElseThrow(
-() ->
-new ValidationException(
-"Could not find required property '"
-+ startKeyStr
-+ "' for sequence 
generator."));
-config.getOptional(endKey.stringType().noDefaultValue())
-.orElseThrow(
-() ->
-new ValidationException(
-"Could not find required property '"
-+ endKeyStr
-+ "' for sequence 
generator."));
-
-this.intStart = startKey.intType().noDefaultValue();
-this.intEnd = endKey.intType().noDefaultValue();
-this.longStart = startKey.longType().noDefaultValue();
-this.longEnd = endKey.longType().noDefaultValue();
+// Under sequence, if end and start are not set, the default value is 
used
+this.intStart = startKey.intType().defaultValue(0);
+this.intEnd = endKey.intType().defaultValue(Integer.MAX_VALUE);
+this.longStart = startKey.longType().defaultValue(0L);
+this.longEnd = endKey.longType().defaultValue((long) 
Integer.MAX_VALUE);

Review Comment:
   With the shortcomings that are going to be fixed in FLINK-31192, I'm 
wondering whether we should lower the default max value for now to avoid 
generating to many elements in memory. WDYT? :thinking: We could update the 
default values as soon as the data generation is called incrementally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-05-11 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1190780451


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -49,10 +50,18 @@
 /**
  * Creates a DataGenerator that emits all numbers from the given interval 
exactly once.
  *
+ * He requires that the end must be greater than the start and that the 
total number cannot
+ * be greater than max-1.
+ *

Review Comment:
   ```suggestion
* The {@code SequenceGenerator} requires that the {@code end} must 
be greater than the {@code start} and that the total number cannot be greater 
than {@code Long.MAX_VALUE - 1}.
   ```



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -111,11 +120,20 @@ public boolean hasNext() {
 return !this.valuesToEmit.isEmpty();
 }
 
-private static int safeDivide(long left, long right) {
-Preconditions.checkArgument(right > 0);
-Preconditions.checkArgument(left >= 0);
-Preconditions.checkArgument(left <= Integer.MAX_VALUE * right);
-return (int) (left / right);
+private static long safeDivide(long totalRows, long stepSize) {
+Preconditions.checkArgument(stepSize > 0, "cannot be equal to 0");
+Preconditions.checkArgument(totalRows >= 0, "Cannot be less than 0");
+return totalRows / stepSize;
+}

Review Comment:
   It feels like this method is not necessary anymore considering that we have 
the invariant of `MAX_VALUE-1` in this class. We could remove it and just use 
`totalNumberOfElements / stepSize` in the calling `open` method



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -49,10 +50,18 @@
 /**
  * Creates a DataGenerator that emits all numbers from the given interval 
exactly once.
  *
+ * He requires that the end must be greater than the start and that the 
total number cannot
+ * be greater than max-1.
+ *
  * @param start Start of the range of numbers to emit.
  * @param end End of the range of numbers to emit.
  */
 public SequenceGenerator(long start, long end) {

Review Comment:
   ```suggestion
   protected SequenceGenerator(long start, long end) {
   ```



##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGeneratorTest.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source.datagen;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link SequenceGenerator}. */
+public class SequenceGeneratorTest {
+
+@Test
+public void testStartGreaterThanEnd() {
+assertThatThrownBy(
+() -> {
+final long start = 30;
+final long end = 20;
+SequenceGenerator.longGenerator(start, end);
+})
+.satisfies(
+anyCauseMatches(
+IllegalArgumentException.class,
+"The start value cannot be greater than the 
end value."));
+}
+
+@Test
+public void testTotalQuantity() {

Review Comment:
   This test method can be split into three different test methods 
`testTooLargeRange`, `testMaxRange` and `testSequenceCreation`.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -111,11 +120,20 @@ public boolean hasNext() {
 return !this.valuesToEmit.isEmpty();
 }
 
-private static int safeDivide(long left, long right) {
-Preconditions.checkArgument(right > 0);
-Preconditions.checkArgument(left >= 0);
-

[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-03-16 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1118862415


##
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##
@@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception {
 }
 
 @Test
-void testLackStartForSequence() {
+void testDefaultValueForSequence() {
+DescriptorProperties descriptor = new DescriptorProperties();
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.SEQUENCE);
+
+DataGenTableSource source =
+(DataGenTableSource)
+createTableSource(
+ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+descriptor.asMap());
+DataGenerator[] fieldGenerators = source.getFieldGenerators();
+SequenceGenerator fieldGenerator = (SequenceGenerator) 
fieldGenerators[0];
+long start = fieldGenerator.getStart();
+long end = fieldGenerator.getEnd();
+
+Assertions.assertThat(0)
+.describedAs("The default start value should be 0")
+.isEqualTo(start);
+Assertions.assertThat(Integer.MAX_VALUE)
+.describedAs("The default start value should be 
Integer.MAX_VALUE")
+.isEqualTo(end);
+}
+
+@Test
+void testStartEndForSequence() {
+DescriptorProperties descriptor = new DescriptorProperties();
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.SEQUENCE);
+final int setupStart = 10;
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.START,
+setupStart);
+final int setupEnd = 100;
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.END,
+setupEnd);
+
+DataGenTableSource source =
+(DataGenTableSource)
+createTableSource(
+ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+descriptor.asMap());
+DataGenerator[] fieldGenerators = source.getFieldGenerators();
+SequenceGenerator fieldGenerator = (SequenceGenerator) 
fieldGenerators[0];
+long start = fieldGenerator.getStart();
+long end = fieldGenerator.getEnd();
+
+Assertions.assertThat(setupStart)
+.describedAs("The default start value should be " + setupStart)
+.isEqualTo(start);
+Assertions.assertThat(setupEnd)
+.describedAs("The default start value should be " + setupEnd)
+.isEqualTo(end);
+}
+
+@Test
+void testCornerCaseForSequence() {
+// An example of testing that the end value is greater than the start 
value

Review Comment:
   We could make two test methods out of this one. A Junit test should usually 
only test a single issue. On another note, usually, comments are not necessary 
but should be replaced by assert messages instead. That way, the comment also 
serves as a description for the assert in case the assert fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-02-27 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1118919072


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -83,9 +86,11 @@ public void open(
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
 final long congruence = start + taskIdx;
 
+Preconditions.checkArgument(
+end - start < Long.MAX_VALUE - 1, "Limit exceeded 
Long.MAX_VALUE-1");

Review Comment:
   ```suggestion
   end - start < Long.MAX_VALUE - 1, "Limit exceeded " + 
Long.MAX_VALUE - 1);
   ```
   Don't we want to print the actual value here? I'm a bit torn here because I 
see the problem with just printing a really big number. Going for 
`Long.MAX_VALUE` might actually more readabile. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-02-27 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1118867104


##
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##
@@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception {
 }
 
 @Test
-void testLackStartForSequence() {
+void testDefaultValueForSequence() {
+DescriptorProperties descriptor = new DescriptorProperties();
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.SEQUENCE);
+
+DataGenTableSource source =
+(DataGenTableSource)
+createTableSource(
+ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+descriptor.asMap());
+DataGenerator[] fieldGenerators = source.getFieldGenerators();
+SequenceGenerator fieldGenerator = (SequenceGenerator) 
fieldGenerators[0];
+long start = fieldGenerator.getStart();
+long end = fieldGenerator.getEnd();
+
+Assertions.assertThat(0)
+.describedAs("The default start value should be 0")
+.isEqualTo(start);
+Assertions.assertThat(Integer.MAX_VALUE)
+.describedAs("The default start value should be 
Integer.MAX_VALUE")
+.isEqualTo(end);
+}
+
+@Test
+void testStartEndForSequence() {
+DescriptorProperties descriptor = new DescriptorProperties();
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.SEQUENCE);
+final int setupStart = 10;
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.START,
+setupStart);
+final int setupEnd = 100;
+descriptor.putLong(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.END,
+setupEnd);
+
+DataGenTableSource source =
+(DataGenTableSource)
+createTableSource(
+ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+descriptor.asMap());
+DataGenerator[] fieldGenerators = source.getFieldGenerators();
+SequenceGenerator fieldGenerator = (SequenceGenerator) 
fieldGenerators[0];
+long start = fieldGenerator.getStart();
+long end = fieldGenerator.getEnd();
+
+Assertions.assertThat(setupStart)
+.describedAs("The default start value should be " + setupStart)
+.isEqualTo(start);
+Assertions.assertThat(setupEnd)
+.describedAs("The default start value should be " + setupEnd)
+.isEqualTo(end);
+}
+
+@Test
+void testCornerCaseForSequence() {
+// An example of testing that the end value is greater than the start 
value
 assertThatThrownBy(

Review Comment:
   `assertThatThrownBy` should be used as specifically as possible: 
Theoretically, the `IllegalArgumentException` could have been thrown by any 
other code line in the passed block even though you're only interested in the 
`createTableSource` here.



##
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##
@@ -264,7 +267,68 @@ void testSequenceCheckpointRestore() throws Exception {
 }
 
 @Test
-void testLackStartForSequence() {
+void testDefaultValueForSequence() {
+DescriptorProperties descriptor = new DescriptorProperties();
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.SEQUENCE);
+
+DataGenTableSource source =
+(DataGenTableSource)
+createTableSource(
+ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+descriptor.asMap());
+DataGenerator[] fieldGenerators = source.getFieldGenerators();
+SequenceGenerator fieldGenerator = (SequenceGenerator) 
fieldGenerators[0];
+long start = fieldGenerator.getStart();
+long end = fieldGenerator.getEnd();
+
+Assertions.assertThat(0)
+.describedAs("The default start value should be 0")
+.isEqualTo(start);
+

[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-02-27 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1118856054


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -83,10 +85,22 @@ public void open(
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
 final long congruence = start + taskIdx;
 
-long totalNoOfElements = Math.abs(end - start + 1);
-final int baseSize = safeDivide(totalNoOfElements, stepSize);
-final int toCollect =
-(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+
+Preconditions.checkArgument(
+start < end, "The start value cannot be greater than the 
end value.");
+
+// After preventing setting to Long.MAX_VALUE, the length of
+// Long type will be exceeded after +1
+final BigInteger totalNoOfElements =
+BigInteger.valueOf(end)
+.subtract(BigInteger.valueOf(start))
+.add(BigInteger.valueOf(1));
+
+final BigInteger baseSize = 
totalNoOfElements.divide(BigInteger.valueOf(stepSize));
+final long toCollect =
+
totalNoOfElements.remainder(BigInteger.valueOf(stepSize)).longValue() > taskIdx
+? baseSize.add(BigInteger.valueOf(1)).longValue()

Review Comment:
   yes, that's what I was referring to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-02-21 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1112952301


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java:
##
@@ -83,10 +85,22 @@ public void open(
 final int taskIdx = runtimeContext.getIndexOfThisSubtask();
 final long congruence = start + taskIdx;
 
-long totalNoOfElements = Math.abs(end - start + 1);
-final int baseSize = safeDivide(totalNoOfElements, stepSize);
-final int toCollect =
-(totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : 
baseSize;
+
+Preconditions.checkArgument(
+start < end, "The start value cannot be greater than the 
end value.");
+
+// After preventing setting to Long.MAX_VALUE, the length of
+// Long type will be exceeded after +1
+final BigInteger totalNoOfElements =
+BigInteger.valueOf(end)
+.subtract(BigInteger.valueOf(start))
+.add(BigInteger.valueOf(1));
+
+final BigInteger baseSize = 
totalNoOfElements.divide(BigInteger.valueOf(stepSize));
+final long toCollect =
+
totalNoOfElements.remainder(BigInteger.valueOf(stepSize)).longValue() > taskIdx
+? baseSize.add(BigInteger.valueOf(1)).longValue()

Review Comment:
   IIUC, `baseSize` can be bigger than `Long.MAX_VALUE` which will make this 
line prone to errors. Wouldn't it be easier to just limit the upper threshold 
to `Integer.MAX_VALUE-1`? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21971: [FLINK-31084][connectors/dataGen] Add default value for dataGen seque…

2023-02-20 Thread via GitHub


XComp commented on code in PR #21971:
URL: https://github.com/apache/flink/pull/21971#discussion_r1112201535


##
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##
@@ -264,57 +267,63 @@ void testSequenceCheckpointRestore() throws Exception {
 }
 
 @Test
-void testLackStartForSequence() {
-assertThatThrownBy(
-() -> {
-DescriptorProperties descriptor = new 
DescriptorProperties();
-descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
-descriptor.putString(
-DataGenConnectorOptionsUtil.FIELDS
-+ ".f0."
-+ DataGenConnectorOptionsUtil.KIND,
-DataGenConnectorOptionsUtil.SEQUENCE);
-descriptor.putLong(
-DataGenConnectorOptionsUtil.FIELDS
-+ ".f0."
-+ DataGenConnectorOptionsUtil.END,
-100);
+void testDefaultValueForSequence() {
+DescriptorProperties descriptor = new DescriptorProperties();
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.SEQUENCE);
 
-createTableSource(
-ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
-descriptor.asMap());
-})
-.satisfies(
-anyCauseMatches(
-ValidationException.class,
-"Could not find required property 
'fields.f0.start' for sequence generator."));
+DataGenTableSource source =
+(DataGenTableSource)
+createTableSource(
+ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
+descriptor.asMap());
+DataGenerator[] fieldGenerators = source.getFieldGenerators();
+SequenceGenerator fieldGenerator = (SequenceGenerator) 
fieldGenerators[0];
+long start = fieldGenerator.getStart();
+long end = fieldGenerator.getEnd();
+
+Assertions.assertThat(0)
+.describedAs("The default start value should be 0")
+.isEqualTo(start);
+Assertions.assertThat(Integer.MAX_VALUE)
+.describedAs("The default start value should be 
Integer.MAX_VALUE")
+.isEqualTo(end);
 }
 
 @Test
-void testLackEndForSequence() {
-assertThatThrownBy(
-() -> {
-DescriptorProperties descriptor = new 
DescriptorProperties();
-descriptor.putString(FactoryUtil.CONNECTOR.key(), 
"datagen");
-descriptor.putString(
-DataGenConnectorOptionsUtil.FIELDS
-+ ".f0."
-+ DataGenConnectorOptionsUtil.KIND,
-DataGenConnectorOptionsUtil.SEQUENCE);
-descriptor.putLong(
-DataGenConnectorOptionsUtil.FIELDS
-+ ".f0."
-+ 
DataGenConnectorOptionsUtil.START,
-0);
-
-createTableSource(
-ResolvedSchema.of(Column.physical("f0", 
DataTypes.BIGINT())),
-descriptor.asMap());
-})
-.satisfies(
-anyCauseMatches(
-ValidationException.class,
-"Could not find required property 
'fields.f0.end' for sequence generator."));
+void testStartEndForSequence() {
+DescriptorProperties descriptor = new DescriptorProperties();
+descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+descriptor.putString(
+DataGenConnectorOptionsUtil.FIELDS + ".f0." + 
DataGenConnectorOptionsUtil.KIND,
+DataGenConnectorOptionsUtil.SEQUENCE);
+int setupStart = 0;

Review Comment:
   It's not helpful to use the default value here because you cannot check that 
the "set" value is used instead of the default value.



-- 
This is an automated message from the Apache Git Service.
To respond to the