Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21477#discussion_r193286932
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
finally:
q.stop()
shutil.rmtree(tmpPath)
+ '''
+ class ForeachWriterTester:
+
+ def __init__(self, spark):
+ self.spark = spark
+ self.input_dir = tempfile.mkdtemp()
+ self.open_events_dir = tempfile.mkdtemp()
+ self.process_events_dir = tempfile.mkdtemp()
+ self.close_events_dir = tempfile.mkdtemp()
+
+ def write_open_event(self, partitionId, epochId):
+ self._write_event(
+ self.open_events_dir,
+ {'partition': partitionId, 'epoch': epochId})
+
+ def write_process_event(self, row):
+ self._write_event(self.process_events_dir, {'value': 'text'})
+
+ def write_close_event(self, error):
+ self._write_event(self.close_events_dir, {'error': str(error)})
+
+ def write_input_file(self):
+ self._write_event(self.input_dir, "text")
+
+ def open_events(self):
+ return self._read_events(self.open_events_dir, 'partition INT,
epoch INT')
+
+ def process_events(self):
+ return self._read_events(self.process_events_dir, 'value
STRING')
+
+ def close_events(self):
+ return self._read_events(self.close_events_dir, 'error STRING')
+
+ def run_streaming_query_on_writer(self, writer, num_files):
+ try:
+ sdf =
self.spark.readStream.format('text').load(self.input_dir)
+ sq = sdf.writeStream.foreach(writer).start()
+ for i in range(num_files):
+ self.write_input_file()
+ sq.processAllAvailable()
+ sq.stop()
+ finally:
+ self.stop_all()
+
+ def _read_events(self, dir, json):
+ rows = self.spark.read.schema(json).json(dir).collect()
+ dicts = [row.asDict() for row in rows]
+ return dicts
+
+ def _write_event(self, dir, event):
+ import random
+ file = open(os.path.join(dir, str(random.randint(0, 100000))),
'w')
--- End diff --
We might feel more convenient with `with` statement, and renaming `file` to
`f` or `fw` or so. Please ignore if there's specific reason not to use `with`
statement.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]