Sorry for late reply.

I cannot drop the ValueProvider. However, when I run it locally with a
local DataflowRunner - it works. I just need to add backslashes to the
double quotes in the Json.

However, it doesn't work with Dataflow on Google Cloud since Dataflow
strips the backslashes. I tried triggering the job both from UI and from
Cloud API using Java client.

I guess this is a Dataflow bug.

So to summarize: you cannot use POJOs in the JobOptions in Google Cloud
Dataflow (but you can generally in Apache Beam).

On Fri, Feb 4, 2022 at 11:28 PM Luke Cwik <[email protected]> wrote:

> Does it work if you don't use ValueProvider?
>   def getTimeWindow: TimeWindow
>   def setTimeWindow(v: TimeWindow)
>
> On Wed, Feb 2, 2022 at 4:44 AM Ori Popowski <[email protected]> wrote:
>
>> I have the following PipelineOptions class:
>>
>> trait JobOptions extends PipelineOptions {
>>   def getReportPath: ValueProvider[String]
>>   def setReportPath(v: ValueProvider[String])
>>
>>   def getTimeWindow: ValueProvider[TimeWindow]
>>   def setTimeWindow(v: ValueProvider[TimeWindow])
>> }
>>
>> The TimeWindow POJO is a classic POJO with getters, setters, 3-arg
>> constructor and 0-arg constructor, auto-generated by IntelliJ. It has 3
>> strings: dateRangeStart, dateRangeEnd, slidingWindowDuration.
>>
>> This is how I run the job:
>> gcloud dataflow jobs run test --gcs-location gs://…/template --region
>> europe-west3 --num-workers 2 --staging-location gs://…/template
>> --subnetwork https://…/subnetworks/dataproc-1 --network data-qa
>> --parameters
>> reportPath=gs://…/test-input/**,timeWindow={"dateRangeStart":"start","dateRangeEnd":"end","slidingWindowDuration":"window"}
>>
>>
>> However, when running the job I am getting the following error:
>> Error message from worker: java.lang.RuntimeException:
>> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
>> Unable to load runtime value.
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:197)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:168)
>>
>> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:66)
>>
>> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:53)
>>
>> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:90)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:128)
>>
>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:361)
>>
>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>> java.lang.RuntimeException: Unable to load runtime value.
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>> walkme.UpdateAerospike$DoFnInvoker.invokeSetup(Unknown Source)
>>
>> org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
>>
>> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:86)
>>
>> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:68)
>>
>> org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
>>
>> org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:267)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:89)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:186)
>> ... 14 more
>> Caused by: java.lang.RuntimeException: Unable to load runtime value.
>>
>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:272)
>> walkme.UpdateAerospike.setup(UpdateAerospike.scala:15)
>> Caused by: java.lang.RuntimeException: Unable to parse representation
>>
>> org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:519)
>>
>> org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:512)
>>
>> org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:168)
>>
>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:262)
>> walkme.UpdateAerospike.setup(UpdateAerospike.scala:15)
>> walkme.UpdateAerospike$DoFnInvoker.invokeSetup(Unknown Source)
>>
>> org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53)
>>
>> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:86)
>>
>> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:68)
>>
>> org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
>>
>> org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:267)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:89)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:186)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:168)
>>
>> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:66)
>>
>> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:53)
>>
>> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:90)
>>
>> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:128)
>>
>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:361)
>>
>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> java.lang.Thread.run(Thread.java:748)
>> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
>> Cannot construct instance of `walkme.model.TimeWindow` (although at least
>> one Creator exists): no String-argument constructor/factory method to
>> deserialize from String value
>> ('{"dateRangeStart":"start","dateRangeEnd":"end","slidingWindowDuration":"window"}')
>>  at [Source: UNKNOWN; byte offset: #UNKNOWN]
>>
>> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
>>
>> com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1728)
>>
>> com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1353)
>>
>> com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311)
>>
>> com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1495)
>>
>> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:196)
>>
>> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:186)
>>
>> org.apache.beam.sdk.options.ValueProvider$Deserializer.deserialize(ValueProvider.java:362)
>>
>> org.apache.beam.sdk.options.ValueProvider$Deserializer.deserialize(ValueProvider.java:328)
>>
>> org.apache.beam.sdk.options.PipelineOptionsFactory.deserializeNode(PipelineOptionsFactory.java:1796)
>>
>> org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:517)
>> ... 27 more
>>
>>

Reply via email to