We used to run legacy sources using the old-style Read translation. Changing it to SDF might have broken ReadFromPubSub. Could you check in the Flink jobs whether it uses the SDF code or the Read translation? For Read you should be seeing the UnboundedSourceWrapper.

Looking at the code, there is no integration like there is for the other external IOs: https://github.com/apache/beam/tree/master/sdks/python/apache_beam/io/external

Might be worth adding one.

-Max

On 28.10.20 19:33, Sam Bourne wrote:
Yeah, I’m able to run that. |apache_beam.io.ReadFromPubSub| transform works just fine but only for DirectRunner in python. In flink we’re using the java implementation via an external transform |apache_beam.io.external.gcp.pubsub.ReadFromPubSub|.

Is there a different way to do this?


On Wed, Oct 28, 2020 at 10:47 AM Kyle Weaver <[email protected] <mailto:[email protected]>> wrote:

    Are you able to run streaming word count on the same setup?

    On Tue, Oct 27, 2020 at 5:39 PM Sam Bourne <[email protected]
    <mailto:[email protected]>> wrote:

        We updated from beam 2.18.0 to 2.24.0 and have been having
        issues using the python |ReadFromPubSub| external transform in
        flink 1.10. It seems like it starts up just fine, but it doesn’t
        consume any messages.

        I tried to reduce it to a simple example and tested back to beam
        2.22.0 but have gotten the same results (of no messages being read).

        I have a hard time believing that it’s been broken for so many
        versions but I can’t seem to identify what I’ve done wrong.

        Steps to test:

        1) Spin up the expansion service

        |docker run -d --rm --network host
        apache/beam_flink1.10_job_server:2.24.0 |

        2) Create a simple pipeline using
        |apache_beam.io.external.gcp.pubsub.ReadFromPubSub|
        Here’s mine:
        https://gist.github.com/sambvfx/a8582f4805e468a97331b0eb13911ebf
        <https://gist.github.com/sambvfx/a8582f4805e468a97331b0eb13911ebf>

        3) Run the pipeline

        |python -m pubsub_example --runner=FlinkRunner
        --save_main_session --flink_submit_uber_jar
        --environment_type=DOCKER
        --environment_config=apache/beam_python3.7_sdk:2.24.0
        --checkpointing_interval=10000 --streaming |

        4) Emit a few pubsub messages

        |python -m pubsub_example --msg hello python -m pubsub_example
        --msg world |

        What am I missing here?
        -Sam

        ------------------------------------------------------------------------

        Some debugging things I’ve tried:

          * I can run |ReadFromPubSub| (the DirectRunner version just fine).
          * I confirmed that the gcloud credentials make it into the
            java sdk container that spins up. Without these you get
            credential type errors.
          * I modified the java |DockerEnvironmentFactory| to instead
            mount |GOOGLE_APPLICATION_CREDENTIALS| service account .json
            and set the env var.
          * I’ve tried a variety of different flink flags.

Reply via email to