asfgit closed pull request #23367: [SPARK-26428][SS][TEST] Minimize deprecated
`ProcessingTime` usage
URL: https://github.com/apache/spark/pull/23367
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 61cbb3285a4f0..d4eb526540053 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext
@@ -236,7 +236,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
}
testStream(mapped)(
- StartStream(ProcessingTime(100), clock),
+ StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 1 from smallest, 1 from middle, 8 from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
@@ -247,7 +247,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
11, 108, 109, 110, 111, 112, 113, 114, 115, 116
),
StopStream,
- StartStream(ProcessingTime(100), clock),
+ StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
@@ -282,7 +282,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
- StartStream(trigger = ProcessingTime(1)),
+ StartStream(trigger = Trigger.ProcessingTime(1)),
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
@@ -605,7 +605,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
}
testStream(kafka)(
- StartStream(ProcessingTime(100), clock),
+ StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckLastBatch((0 to 4) ++ (100 to 104): _*),
@@ -618,7 +618,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
// smaller topic empty, 5 from bigger one
CheckLastBatch(110 to 114: _*),
StopStream,
- StartStream(ProcessingTime(100), clock),
+ StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
// smallest now empty, 5 from bigger one
CheckLastBatch(115 to 119: _*),
@@ -727,7 +727,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
// The message values are the same as their offsets to make the test easy
to follow
testUtils.withTranscationalProducer { producer =>
testStream(mapped)(
- StartStream(ProcessingTime(100), clock),
+ StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
CheckAnswer(),
WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
@@ -850,7 +850,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
// The message values are the same as their offsets to make the test easy
to follow
testUtils.withTranscationalProducer { producer =>
testStream(mapped)(
- StartStream(ProcessingTime(100), clock),
+ StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed,
CheckNewAnswer(),
WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index d4bd9c7987f2d..de664cafed3b6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1360,7 +1360,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
options = srcOptions)
val clock = new StreamManualClock()
testStream(fileStream)(
- StartStream(trigger = ProcessingTime(10), triggerClock = clock),
+ StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
AssertOnQuery { _ =>
// Block until the first batch finishes.
eventually(timeout(streamingTimeout)) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f55ddb5419d20..55fdcee83f114 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -312,7 +312,7 @@ class StreamSuite extends StreamTest {
val inputData = MemoryStream[Int]
testStream(inputData.toDS())(
- StartStream(ProcessingTime("10 seconds"), new StreamManualClock),
+ StartStream(Trigger.ProcessingTime("10 seconds"), new StreamManualClock),
/* -- batch 0 ----------------------- */
// Add some data in batch 0
@@ -353,7 +353,7 @@ class StreamSuite extends StreamTest {
/* Stop then restart the Stream */
StopStream,
- StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 *
1000)),
+ StartStream(Trigger.ProcessingTime("10 seconds"), new
StreamManualClock(60 * 1000)),
/* -- batch 1 no rerun ----------------- */
// batch 1 would not re-run because the latest batch id logged in commit
log is 1
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index fe77a1b4469c5..d00f2e3bf4d1a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -82,7 +82,7 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
testStream(df, OutputMode.Append)(
// Start event generated when query started
- StartStream(ProcessingTime(100), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
AssertOnQuery { query =>
assert(listener.startEvent !== null)
assert(listener.startEvent.id === query.id)
@@ -124,7 +124,7 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
},
// Termination event generated with exception message when stopped
with error
- StartStream(ProcessingTime(100), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
AssertStreamExecThreadToWaitForClock(),
AddData(inputData, 0),
AdvanceManualClock(100), // process bad data
@@ -306,7 +306,7 @@ class StreamingQueryListenerSuite extends StreamTest with
BeforeAndAfter {
}
val clock = new StreamManualClock()
val actions = mutable.ArrayBuffer[StreamAction]()
- actions += StartStream(trigger = ProcessingTime(10), triggerClock =
clock)
+ actions += StartStream(trigger = Trigger.ProcessingTime(10),
triggerClock = clock)
for (_ <- 1 to 100) {
actions += AdvanceManualClock(10)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index c170641372d61..29b816486a1fe 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -257,7 +257,7 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
var lastProgressBeforeStop: StreamingQueryProgress = null
testStream(mapped, OutputMode.Complete)(
- StartStream(ProcessingTime(1000), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime(1000), triggerClock = clock),
AssertStreamExecThreadIsWaitingForTime(1000),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
@@ -370,7 +370,7 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
AssertOnQuery(_.status.message === "Stopped"),
// Test status and progress after query terminated with error
- StartStream(ProcessingTime(1000), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime(1000), triggerClock = clock),
AdvanceManualClock(1000), // ensure initial trigger completes before
AddData
AddData(inputData, 0),
AdvanceManualClock(1000), // allow another trigger
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]