Sorry for late reply. I cannot drop the ValueProvider. However, when I run it locally with a local DataflowRunner - it works. I just need to add backslashes to the double quotes in the Json.
However, it doesn't work with Dataflow on Google Cloud since Dataflow strips the backslashes. I tried triggering the job both from UI and from Cloud API using Java client. I guess this is a Dataflow bug. So to summarize: you cannot use POJOs in the JobOptions in Google Cloud Dataflow (but you can generally in Apache Beam). On Fri, Feb 4, 2022 at 11:28 PM Luke Cwik <[email protected]> wrote: > Does it work if you don't use ValueProvider? > def getTimeWindow: TimeWindow > def setTimeWindow(v: TimeWindow) > > On Wed, Feb 2, 2022 at 4:44 AM Ori Popowski <[email protected]> wrote: > >> I have the following PipelineOptions class: >> >> trait JobOptions extends PipelineOptions { >> def getReportPath: ValueProvider[String] >> def setReportPath(v: ValueProvider[String]) >> >> def getTimeWindow: ValueProvider[TimeWindow] >> def setTimeWindow(v: ValueProvider[TimeWindow]) >> } >> >> The TimeWindow POJO is a classic POJO with getters, setters, 3-arg >> constructor and 0-arg constructor, auto-generated by IntelliJ. It has 3 >> strings: dateRangeStart, dateRangeEnd, slidingWindowDuration. >> >> This is how I run the job: >> gcloud dataflow jobs run test --gcs-location gs://…/template --region >> europe-west3 --num-workers 2 --staging-location gs://…/template >> --subnetwork https://…/subnetworks/dataproc-1 --network data-qa >> --parameters >> reportPath=gs://…/test-input/**,timeWindow={"dateRangeStart":"start","dateRangeEnd":"end","slidingWindowDuration":"window"} >> >> >> However, when running the job I am getting the following error: >> Error message from worker: java.lang.RuntimeException: >> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: >> Unable to load runtime value. >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:197) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:168) >> >> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:66) >> >> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:53) >> >> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:90) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:128) >> >> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:361) >> >> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314) >> >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) >> >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) >> >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) >> java.util.concurrent.FutureTask.run(FutureTask.java:266) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.beam.sdk.util.UserCodeException: >> java.lang.RuntimeException: Unable to load runtime value. >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) >> walkme.UpdateAerospike$DoFnInvoker.invokeSetup(Unknown Source) >> >> org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53) >> >> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:86) >> >> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:68) >> >> org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100) >> >> org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:267) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:89) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:186) >> ... 14 more >> Caused by: java.lang.RuntimeException: Unable to load runtime value. >> >> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:272) >> walkme.UpdateAerospike.setup(UpdateAerospike.scala:15) >> Caused by: java.lang.RuntimeException: Unable to parse representation >> >> org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:519) >> >> org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:512) >> >> org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:168) >> >> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:262) >> walkme.UpdateAerospike.setup(UpdateAerospike.scala:15) >> walkme.UpdateAerospike$DoFnInvoker.invokeSetup(Unknown Source) >> >> org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53) >> >> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:86) >> >> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:68) >> >> org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100) >> >> org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:267) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:89) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:186) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:168) >> >> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:66) >> >> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:53) >> >> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:90) >> >> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:128) >> >> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:361) >> >> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314) >> >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) >> >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) >> >> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) >> java.util.concurrent.FutureTask.run(FutureTask.java:266) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> java.lang.Thread.run(Thread.java:748) >> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: >> Cannot construct instance of `walkme.model.TimeWindow` (although at least >> one Creator exists): no String-argument constructor/factory method to >> deserialize from String value >> ('{"dateRangeStart":"start","dateRangeEnd":"end","slidingWindowDuration":"window"}') >> at [Source: UNKNOWN; byte offset: #UNKNOWN] >> >> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) >> >> com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1728) >> >> com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1353) >> >> com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311) >> >> com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1495) >> >> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:196) >> >> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:186) >> >> org.apache.beam.sdk.options.ValueProvider$Deserializer.deserialize(ValueProvider.java:362) >> >> org.apache.beam.sdk.options.ValueProvider$Deserializer.deserialize(ValueProvider.java:328) >> >> org.apache.beam.sdk.options.PipelineOptionsFactory.deserializeNode(PipelineOptionsFactory.java:1796) >> >> org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:517) >> ... 27 more >> >>
