> I had a look at the util_test.py, and i see that in those tests pipelines are being created as part of tests., and in these tests what are being tested are beam functions - eg beam.Map etc.
assert_that checks the results of an entire pipeline, not individual transforms. You should be able to apply assert_that to your example: p = beam.Pipeline(options=pipeline_options) lines = (p | 'Get List of Tickers' >> ReadFromText(input_file) | 'Split fields' >> beam.Map(split_fields) | 'Map to String' >> beam.Map(add_year) assert_that(lines, equal_to(["expected_value1", "expected_value2", ...])) On Fri, Jul 17, 2020 at 2:53 PM Sofia’s World <mmistr...@gmail.com> wrote: > Hello Robert > thanks but i think i am either missing the point or not expressing > clearly my goal. > I had a look at the util_test.py, and i see that in those tests pipelines > are being created as part of tests., and in these tests what are being > tested are beam functions - eg beam.Map etc. > I am after testing a pipeline as a whole. Taking this example, > > p = beam.Pipeline(options=pipeline_options) > lines = (p > | 'Get List of Tickers' >> ReadFromText(input_file) > | 'Split fields' >> beam.Map(split_fields) > | 'Map to String' >> beam.Map(add_year) > > what i am trying to do is to test a full pipeline run, like in the test > example below > > from mypackage.email_pipeline import run > > @patch('testing.email_pipeline.ReadFromText') > def test_create_pipelne(self, mock_read_from_text): > mock_read_from_text.return_value = ['One', > 'Two', > 'Three'] > > test_pipeline = TestPipeline(is_integration_test=True) > pipeline_verifiers = [ > PipelineStateMatcher(), > ] > extra_opts = { > 'input_table': 'testtable', > 'num_records': 1, > 'beam_bq_source': 'source', > 'on_success_matcher': all_of(*pipeline_verifiers) > } > result = run( > test_pipeline.get_full_options_as_args(**extra_opts)) > > print(result) > > Basically, i would expect a PCollection as result of the pipeline, and i > would be testing the content of the PCollection > > Running this results in this messsage > > IT is skipped because --test-pipeline-options is not specified > > Would you be able to advise on this? > > kind regards > > Marco > > > > > > > > > > > > > On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <rober...@google.com> > wrote: > >> You can use apache_beam.testing.util.assert_that to write tests of >> Beam pipelines. This is what Beam uses for its tests, e.g. >> >> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80 >> >> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mmistr...@gmail.com> >> wrote: >> > >> > Hi all >> > i was wondering if anyone could provide pointers on how to test a >> pipeline in python. >> > I have the following pipeline >> > >> > lines = (p >> > | 'Get List of Tickers' >> beam.Map(get_tickers) >> > | 'Split fields' >> beam.Map(split_fields) >> > | 'Map to String' >> beam.Map(add_year) >> > ) >> > result = p.run() >> > >> > Now i can easily test each individual function for each step >> (get_tickers, split_fields, add_year) >> > >> > but is there a way to test the pipeline 'as a whole' ?# >> > >> > Could anyone point me to some examples? >> > >> > kind regards >> > >> > >> > >> > >> > >> >