If you don't want to write to an actual file, the example with the Check transform should allow you to use Check(...) as you would a sink. (I realize this should have been
run_my_pipeline( beam.Create([...]), "Check1" >> Check(equal_to([...])), "Check2" >> Check(any_callable_that_validates_result2)) to give distinct labels. As for bigquery, yes, these are integration tests that write to real bigquery. You can alternatively check to see the PCollection has the right things you wanted to write. On Tue, Jul 21, 2020 at 1:32 PM Sofia’s World <mmistr...@gmail.com> wrote: > Hello Robert > could you point me to a test sample where a 'mock' sink is used? > do you guys have a testing package , which provide an in memory sink where > for example i can dump the result of > my pipeline (as opposed to writing to a file) ? > Additionally, what is the best way to test writing to BigQuery? > I have seen this file > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py > but it appears it writes to real big query? > > kind regards > Marco > > > > > > > > On Fri, Jul 17, 2020 at 11:05 PM Robert Bradshaw <rober...@google.com> > wrote: > >> If want a full end-to-end integration test of your pipeline, what you can >> do is: >> >> 1) Write your input data to temporary files. >> 2) Run your pipeline, which writes its output somewhere (ideally a >> temp location as well). >> 3) Open up the outputs and see if it was as expected. >> >> This is similar to the test at >> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_test.py >> , but a bit heavy weight. >> >> Another way to validate your pipeline is to refactor the code so the >> inputs and outputs are pluggable. For example, you could write >> >> def run_my_pipeline(input): >> [all your pipeline logic goes here] >> [this could also be wrapped up as a PTransform] >> return result1, result2 >> >> def main(...): >> with beam.Pipeline(...) as p: >> input = p | beam.io.ReadFromText(...) >> result1, result2 = run_my_pipeline(input) >> result1 | beam.io.WriteToSomewhere(...) >> result2 | beam.io.WriteToSomewhereElse(...) >> >> def test(): >> with beam.Pipeline(...) as p: >> input = p | beam.Create(...) >> result1, result2 = run_my_pipeline(input) >> assert_that(result1, equal_to([...])) >> assert_that(result2, any_callable_that_validates_result2, >> label="Check2") >> >> You could also parameterize things on your sinks and sources, e.g. >> >> def run_my_pipeline(source, sink1, sink2): >> with beam.Pipeline(...) as p: >> input = p | source >> ... >> result1 | sink1 >> result2 | sinkn2 >> >> def main(...): >> run_my_pipeline( >> beam.io.ReadFromText(...), >> beam.io.WriteToSomewhere(...), >> beam.io.WriteToSomewhereElse(...)) >> >> def test(): >> >> class Check(beam.PTransform): >> def __init__(checker): >> self._checker = checker >> def expand(pcoll): >> assert_that(pcoll, self._checker) >> >> run_my_pipeline( >> beam.Create([...]), >> Check1(equal_to([...])), >> Check2(any_callable_that_validates_result2)) >> >> or various permutations thereof. >> >> Is that more what you're looking for? >> >> >> >> On Fri, Jul 17, 2020 at 2:46 PM Sofia’s World <mmistr...@gmail.com> >> wrote: >> > >> > Hello Robert >> > thanks but i think i am either missing the point or not expressing >> clearly my goal. >> > I had a look at the util_test.py, and i see that in those tests >> pipelines are being created as part of tests., and in these tests what are >> being tested are beam functions - eg beam.Map etc. >> > I am after testing a pipeline as a whole. Taking this example, >> > >> > p = beam.Pipeline(options=pipeline_options) >> > lines = (p >> > | 'Get List of Tickers' >> ReadFromText(input_file) >> > | 'Split fields' >> beam.Map(split_fields) >> > | 'Map to String' >> beam.Map(add_year) >> > >> > what i am trying to do is to test a full pipeline run, like in the test >> example below >> > >> > from mypackage.email_pipeline import run >> > >> > @patch('testing.email_pipeline.ReadFromText') >> > def test_create_pipelne(self, mock_read_from_text): >> > mock_read_from_text.return_value = ['One', >> > 'Two', >> > 'Three'] >> > >> > test_pipeline = TestPipeline(is_integration_test=True) >> > pipeline_verifiers = [ >> > PipelineStateMatcher(), >> > ] >> > extra_opts = { >> > 'input_table': 'testtable', >> > 'num_records': 1, >> > 'beam_bq_source': 'source', >> > 'on_success_matcher': all_of(*pipeline_verifiers) >> > } >> > result = run( >> > >> test_pipeline.get_full_options_as_args(**extra_opts)) >> > >> > print(result) >> > >> > Basically, i would expect a PCollection as result of the pipeline, and >> i would be testing the content of the PCollection >> > >> > Running this results in this messsage >> > >> > IT is skipped because --test-pipeline-options is not specified >> > >> > Would you be able to advise on this? >> > >> > kind regards >> > >> > Marco >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >> >> >> You can use apache_beam.testing.util.assert_that to write tests of >> >> Beam pipelines. This is what Beam uses for its tests, e.g. >> >> >> >> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80 >> >> >> >> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mmistr...@gmail.com> >> wrote: >> >> > >> >> > Hi all >> >> > i was wondering if anyone could provide pointers on how to test a >> pipeline in python. >> >> > I have the following pipeline >> >> > >> >> > lines = (p >> >> > | 'Get List of Tickers' >> beam.Map(get_tickers) >> >> > | 'Split fields' >> beam.Map(split_fields) >> >> > | 'Map to String' >> beam.Map(add_year) >> >> > ) >> >> > result = p.run() >> >> > >> >> > Now i can easily test each individual function for each step >> (get_tickers, split_fields, add_year) >> >> > >> >> > but is there a way to test the pipeline 'as a whole' ?# >> >> > >> >> > Could anyone point me to some examples? >> >> > >> >> > kind regards >> >> > >> >> > >> >> > >> >> > >> >> > >> >