Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/18029#discussion_r158627941
--- Diff:
external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisInputDStreamBuilderSuite.java
---
@@ -45,18 +44,90 @@ public void testJavaKinesisDStreamBuilder() {
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(region)
- .initialPositionInStream(initialPosition)
+ .initialPosition(initialPosition)
.checkpointAppName(appName)
.checkpointInterval(checkpointInterval)
.storageLevel(storageLevel)
.build();
assert(kinesisDStream.streamName() == streamName);
assert(kinesisDStream.endpointUrl() == endpointUrl);
assert(kinesisDStream.regionName() == region);
- assert(kinesisDStream.initialPositionInStream() == initialPosition);
+ assert(kinesisDStream.initialPosition().getPosition() ==
initialPosition.getPosition());
+ assert(kinesisDStream.checkpointAppName() == appName);
+ assert(kinesisDStream.checkpointInterval() == checkpointInterval);
+ assert(kinesisDStream._storageLevel() == storageLevel);
+ ssc.stop();
+ }
+
+ /**
+ * Test to ensure that the old API for InitialPositionInStream
+ * is supported in KinesisDStream.Builder.
+ * This test would be removed when we deprecate the KinesisUtils.
+ */
+ @Test
+ public void testJavaKinesisDStreamBuilderOldApi() {
+ String streamName = "a-very-nice-stream-name";
+ String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";
+ String region = "us-west-2";
+ String appName = "a-very-nice-kinesis-app";
+ Duration checkpointInterval = Seconds.apply(30);
+ StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
+
+ KinesisInputDStream<byte[]> kinesisDStream =
KinesisInputDStream.builder()
+ .streamingContext(ssc)
--- End diff --
nit: indentation should be 2 spaces.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]