> I had a look at the util_test.py, and i see that in those tests pipelines
are being created as part of tests., and  in these tests what are being
tested are beam functions - eg beam.Map  etc.

assert_that checks the results of an entire pipeline, not individual
transforms. You should be able to apply assert_that to your example:

p = beam.Pipeline(options=pipeline_options)
lines = (p
         | 'Get List of Tickers' >> ReadFromText(input_file)
         | 'Split fields'  >> beam.Map(split_fields)
         | 'Map to String' >> beam.Map(add_year)

assert_that(lines, equal_to(["expected_value1", "expected_value2", ...]))



On Fri, Jul 17, 2020 at 2:53 PM Sofia’s World <mmistr...@gmail.com> wrote:

> Hello Robert
>  thanks but i think i am either missing the point or not expressing
> clearly my goal.
> I had a look at the util_test.py, and i see that in those tests pipelines
> are being created as part of tests., and  in these tests what are being
> tested are beam functions - eg beam.Map  etc.
> I am after testing a pipeline as a whole. Taking this example,
>
> p = beam.Pipeline(options=pipeline_options)
> lines = (p
>          | 'Get List of Tickers' >> ReadFromText(input_file)
>          | 'Split fields'  >> beam.Map(split_fields)
>          | 'Map to String' >> beam.Map(add_year)
>
> what i am trying to do is to test a full pipeline run, like in the test 
> example below
>
> from mypackage.email_pipeline import run
>
> @patch('testing.email_pipeline.ReadFromText')
> def test_create_pipelne(self, mock_read_from_text):
>     mock_read_from_text.return_value = ['One',
>                                         'Two',
>                                         'Three']
>
>     test_pipeline = TestPipeline(is_integration_test=True)
>     pipeline_verifiers = [
>         PipelineStateMatcher(),
>     ]
>     extra_opts = {
>         'input_table': 'testtable',
>         'num_records': 1,
>         'beam_bq_source': 'source',
>         'on_success_matcher': all_of(*pipeline_verifiers)
>     }
>     result = run(
>                     test_pipeline.get_full_options_as_args(**extra_opts))
>
>     print(result)
>
> Basically, i would expect a PCollection as result of the pipeline, and i 
> would be testing the content of the PCollection
>
> Running this results in this messsage
>
> IT is skipped because --test-pipeline-options is not specified
>
> Would you be able to advise on this?
>
> kind regards
>
>  Marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Mon, Jul 13, 2020 at 10:43 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> You can use apache_beam.testing.util.assert_that to write tests of
>> Beam pipelines. This is what Beam uses for its tests, e.g.
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80
>>
>> On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World <mmistr...@gmail.com>
>> wrote:
>> >
>> > Hi all
>> >  i was wondering if anyone could provide pointers on how  to test a
>> pipeline in python.
>> > I have the following pipeline
>> >
>> > lines = (p
>> >          | 'Get List of Tickers' >> beam.Map(get_tickers)
>> >          | 'Split fields'  >> beam.Map(split_fields)
>> >          | 'Map to String' >> beam.Map(add_year)
>> >          )
>> > result = p.run()
>> >
>> > Now i can easily test each individual function for each step
>> (get_tickers, split_fields, add_year)
>> >
>> > but is there a way to test the pipeline 'as a whole' ?#
>> >
>> > Could anyone point me to some examples?
>> >
>> > kind regards
>> >
>> >
>> >
>> >
>> >
>>
>

Reply via email to