Hi all!
I am extending PipelineOptions and I have getters/setters for several
properties which are passed to my job from Dataflow. The parameters are
indeed passed correctly and I can access them from my pipeline.
However, I'd like to have some custom logic for my parameters which cannot
be expressed by a metadata file with regexes, or @Validation.Required
annotation.
I tried to add the following code:
val options = PipelineOptionsFactory.fromArgs(args:
_*).withValidation().create().as(classOf[JobOptions])
if (options.getUserId().get() == "…" && …) {
throw new IllegalArgumentException("oops")
}
Unfortunately, I am getting this exception when I am staging the pipeline
to Dataflow with mvn -Pdataflow-runner compile exec:java …:
[info] running (fork) walkme.Main --runner=DataflowRunner --project=…
--stagingLocation=gs://…/staging --templateLocation=gs://…/template
--region=europe-west3
[error] Exception in thread "main" java.lang.IllegalStateException: Value
only available at runtime, but accessed from a non-runtime context:
RuntimeValueProvider{propertyName=userId, default=null}
[error] at
org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
[error] at walkme.Main$.main(Main.scala:13)
[error] at walkme.Main.main(Main.scala)
What is the recommended way to achieve what I described?
Thanks!