Hello,

I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with an 
external Python transform) with a PROCESS SDK Harness and Spark Portable Runner 
but it fails.
To do that I have a running Spark Runner Job Server (Spark local) and 
standalone Expansion Service (Python) which contains a code of my Python 
transform that should be called from main Java pipeline.

Once job has been submitted on Job Server and started running, it fails with 
this error:

20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking 
job 
classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
20/05/28 18:55:12 INFO 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting job 
invocation 
classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
20/05/28 18:55:12 ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during 
job invocation 
classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root nodes 
to be runner-implemented beam:transform:impulse:v1 or beam:transform:read:v1 
primitives, but transform Create.Values/Read(CreateSource) executes in 
environment Optional[urn: "beam:env:docker:v1"
payload: "\n\033apache/beam_java_sdk:2.21.0"
capabilities: "beam:coder:bytes:v1”
….


Some code snippets of my pipeline that can be helpful.

Java transform:
private static final String URN = "ml:genreclassifier:python:v1";
@Override
public PCollection<KV<String, String>> expand(PCollection<String> input) {
  PCollection<KV<String, String>> output =
      input.apply(
          "ExternalGenreClassifier",
          External.of(URN, new byte[] {}, options.getExpansionServiceURL())
              .<KV<String, String>>withOutputType());
  return output;
}

expansion_service.py

@ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
class GenreClassifier(ptransform.PTransform):
    def __init__(self):
        super(GenreClassifier, self).__init__()

    def expand(self, pcoll):
        return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())

    def to_runner_api_parameter(self, unused_context):
        return 'ml:genreclassifier:python:v1', None

    @staticmethod
    def from_runner_api_parameter(unused_ptransform, unused_parameter, 
unused_context):
        return GenreClassifier()

def main(unused_argv):
    ...
    server = grpc.server(UnboundedThreadPoolExecutor())
    beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
        expansion_service.ExpansionServiceServicer(
            PipelineOptions.from_dictionary({
                'environment_type': 'PROCESS',
                'environment_config': '{"command": 
“/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}',
                'sdk_location': 'container',
            })
        ), server
    )
    server.add_insecure_port('localhost:{}'.format(options.port))
    server.start()

Does anyone have an idea what’s wrong with my setup/pipeline and how to fix it?


Reply via email to