Hello,
I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with an
external Python transform) with a PROCESS SDK Harness and Spark Portable Runner
but it fails.
To do that I have a running Spark Runner Job Server (Spark local) and
standalone Expansion Service (Python) which contains a code of my Python
transform that should be called from main Java pipeline.
Once job has been submitted on Job Server and started running, it fails with
this error:
20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking
job
classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
20/05/28 18:55:12 INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting job
invocation
classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
20/05/28 18:55:12 ERROR
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during
job invocation
classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root nodes
to be runner-implemented beam:transform:impulse:v1 or beam:transform:read:v1
primitives, but transform Create.Values/Read(CreateSource) executes in
environment Optional[urn: "beam:env:docker:v1"
payload: "\n\033apache/beam_java_sdk:2.21.0"
capabilities: "beam:coder:bytes:v1”
….
Some code snippets of my pipeline that can be helpful.
Java transform:
private static final String URN = "ml:genreclassifier:python:v1";
@Override
public PCollection<KV<String, String>> expand(PCollection<String> input) {
PCollection<KV<String, String>> output =
input.apply(
"ExternalGenreClassifier",
External.of(URN, new byte[] {}, options.getExpansionServiceURL())
.<KV<String, String>>withOutputType());
return output;
}
expansion_service.py
@ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
class GenreClassifier(ptransform.PTransform):
def __init__(self):
super(GenreClassifier, self).__init__()
def expand(self, pcoll):
return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
def to_runner_api_parameter(self, unused_context):
return 'ml:genreclassifier:python:v1', None
@staticmethod
def from_runner_api_parameter(unused_ptransform, unused_parameter,
unused_context):
return GenreClassifier()
def main(unused_argv):
...
server = grpc.server(UnboundedThreadPoolExecutor())
beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
expansion_service.ExpansionServiceServicer(
PipelineOptions.from_dictionary({
'environment_type': 'PROCESS',
'environment_config': '{"command":
“/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}',
'sdk_location': 'container',
})
), server
)
server.add_insecure_port('localhost:{}'.format(options.port))
server.start()
Does anyone have an idea what’s wrong with my setup/pipeline and how to fix it?