Tried that. It doesn't work. Unfortunately Dataflow happily ignores
everything that happens outside the pipeline itself. I even threw an
exception that was ignored:
The Job Name is indeed "test" as you can see in the screenshot. It doesn't
matter what I do, it just ignores it. Even logger.error and
System.err.println.
def main(args: Array[String]): Unit = {
PipelineOptionsFactory.register(classOf[JobOptions])
val options = PipelineOptionsFactory.fromArgs(args:
_*).withValidation().create().as(classOf[JobOptions])
if (options.getJobName == "test") {
throw new RuntimeException("oops")
}
val pipeline = Pipeline.create(options)
if (options.getJobName == "test") {
throw new RuntimeException("oops")
}
val updater = new AerospikeUpdater
pipeline
.apply("Read report", TextIO.read().from(options.getReportPath))
.apply("Update Aerospike", ParDo.of(new UpdateAerospike(updater)))
if (options.getJobName == "test") {
throw new RuntimeException("oops")
}
pipeline.run()
}
[image: image.png]
On Tue, Feb 1, 2022 at 7:05 PM Luke Cwik <[email protected]> wrote:
> Validating at pipeline construction time makes sense unless you are
> building a template. In this case your validating that user is set and your
> getting an exception. If user is necessary then you can do something like:
> if (!options.getUser().isAccessible()) {
> throw new IllegalArgumentException("Required property user is unset.");
> } else if (options.getUser().get() != X) {
> throw new IllegalArgumentException("User property does not satisfy X");
> }
> If user is optional then:
> if (options.getUser().isAccessible() && option.getUser().get() != X) {
> throw new IllegalArgumentException("User property does not satisfy X");
> }
>
>
> On Tue, Feb 1, 2022 at 6:17 AM Ori Popowski <[email protected]> wrote:
>
>> Thanks.
>>
>> I am trying a fail-fast approach here, so I thought the best thing is to
>> validate the options before starting the pipeline.
>>
>> I think validating the options in processElements on each element is
>> wasteful since the validation code will run many times instead of just
>> once. The options are not available for DoFn's @Setup method - only for
>> @StartBundle. Is there any better place to validate the options except for
>> @StartBundle?
>>
>> Thanks
>>
>> On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <[email protected]> wrote:
>>
>>> Hi Ori,
>>>
>>> The error message hints that you are trying to use a ValueProvider
>>> during pipeline construction. You should only call the .get() method
>>> from code that is run during pipeline execution, i.e. in a lambda for
>>> MapElements or in a DoFn's processElements.
>>>
>>> Check the Dataflow docs on this topic here [1]. It also spells out your
>>> error message.
>>>
>>> Best,
>>> Chris
>>>
>>>
>>> [1]
>>> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>>>
>>> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <[email protected]> wrote:
>>>
>>>> Hi all!
>>>>
>>>> I am extending PipelineOptions and I have getters/setters for several
>>>> properties which are passed to my job from Dataflow. The parameters are
>>>> indeed passed correctly and I can access them from my pipeline.
>>>>
>>>> However, I'd like to have some custom logic for my parameters which
>>>> cannot be expressed by a metadata file with regexes, or
>>>> @Validation.Required annotation.
>>>>
>>>> I tried to add the following code:
>>>>
>>>> val options = PipelineOptionsFactory.fromArgs(args:
>>>> _*).withValidation().create().as(classOf[JobOptions])
>>>>
>>>> if (options.getUserId().get() == "…" && …) {
>>>> throw new IllegalArgumentException("oops")
>>>> }
>>>>
>>>> Unfortunately, I am getting this exception when I am staging the
>>>> pipeline to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>>>
>>>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>>>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>>>> --region=europe-west3
>>>> [error] Exception in thread "main" java.lang.IllegalStateException:
>>>> Value only available at runtime, but accessed from a non-runtime context:
>>>> RuntimeValueProvider{propertyName=userId, default=null}
>>>> [error] at
>>>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>>>> [error] at walkme.Main$.main(Main.scala:13)
>>>> [error] at walkme.Main.main(Main.scala)
>>>>
>>>> What is the recommended way to achieve what I described?
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>>> --
>>> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
>>> <[email protected]>
>>> Staff Cloud Architect
>>> EMEA North
>>> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
>>> <https://twitter.com/doitint> [image: linkedin]
>>> <https://www.linkedin.com/company/doitintl> [image: medium]
>>> <https://blog.doit-intl.com/>
>>>
>>