Hello Trevor,

Kenn is correct that Beam creates no-op Flink sinks. So if a sink isn't
being created, it's possibly a bug in Beam.

Is this a batch or streaming pipeline? Which Flink version are you using?

Kyle

On Thu, Jun 10, 2021 at 3:19 PM Kenneth Knowles <[email protected]> wrote:

> Beam doesn't use Flink's sink API. I recall from a very long time ago that
> we attached a noop sink to each PCollection to avoid this error. +Kyle
> Weaver <[email protected]> might know something about how this applies
> to Python on Flink.
>
> Kenn
>
> On Wed, Jun 9, 2021 at 4:41 PM Trevor Kramer <[email protected]>
> wrote:
>
>> Hello Beam community,
>>
>> I am getting the following error running a Beam pipeline on Flink.
>>
>> RuntimeError: Pipeline BeamApp failed in state FAILED:
>> java.lang.RuntimeException: No data sinks have been created yet. A program
>> needs at least one sink that consumes data. Examples are writing the data
>> set or printing it.
>>
>> Here is my pipeline which I believe has a sink at the end of it. What am
>> I missing?
>>
>> with beam.Pipeline(options=options) as p:
>>     (p
>>      | 'Read SDF' >> ParseSDF('s3://some-path.sdf')
>>      | 'Sample' >> beam.combiners.Sample.FixedSizeGlobally(1000)
>>      | 'Flatten' >> beam.FlatMap(lambda x: x)
>>      | 'Standardize' >> beam.Map(standardize)
>>      | 'Make FPs' >> beam.Map(calculate_fps)
>>      | 'Make Dict' >> beam.Map(lambda x: {'fp': x})
>>      | 'Write Parquet' >> WriteToParquet('s3://some-path', pyarrow.schema(
>>                 [('fp', pyarrow.list_(pyarrow.int64(), 2048))]
>>             ))
>>      )
>>
>>
>> Thanks,
>>
>>
>> Trevor
>>
>>

Reply via email to