Thanks Eugene! Now the job can be launched against Dataflow. Cheers
On Fri, Mar 2, 2018 at 2:23 PM, Eugene Kirpichov <[email protected]> wrote: > Hi! > > It is supported by Dataflow runner in streaming mode. Ideally we should > force streaming mode if this is used, but for now you can specify > options.setStreaming(true) or --streaming on command line. > > On Fri, Mar 2, 2018 at 4:43 AM Jose Ignacio Honrado <[email protected]> > wrote: > >> Hi, >> >> I am trying to read files from GCS using the Watch transform >> (org.apache.beam.sdk.transforms.Watch) to ultimate insert them in BQ. I >> am using Scio API and here is a test pipeline code: >> >> object GCS2BQ { >> private val Logger = LoggerFactory.getLogger(getClass) >> >> def main(cmdLineArgs: Array[String]): Unit = { >> val opts = PipelineOptionsFactory >> .fromArgs(cmdLineArgs: _*) >> .withValidation() >> .as(classOf[GCS2BQOptions]) >> val sc = ScioContext(opts) >> >> val pollFilesFromSource = FileIO.`match`() >> .continuously(Duration.standardMinutes(opts.getPollInterval), >> Watch.Growth.never()) >> .filepattern(opts.getSourcePath) >> >> sc.customInput("GCS Input", pollFilesFromSource) >> .applyTransform(FileIO.readMatches().withCompression( >> Compression.GZIP)) >> .applyTransform(TextIO.readFiles()) >> .debug() >> >> sc.close().waitUntilDone() >> } >> >> It works properly using DirectRunner but I obtain the following when >> trying to run it on Dataflow: >> >> ... >> [error] [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil - >> Staging files complete: 137 files cached, 1 files newly uploaded >> [error] [main] INFO >> org.apache.beam.runners.dataflow.DataflowPipelineTranslator >> - Adding GCS Input/Create filepattern/Create.Values/Read(CreateSource) >> as step s1 >> [error] [main] INFO >> org.apache.beam.runners.dataflow.DataflowPipelineTranslator >> - Adding GCS Input/Via MatchAll/Continuously match >> filepatterns/ParDo(WatchGrowth) >> as step s2 >> [error] Exception in thread "main" java.lang.UnsupportedOperationException: >> DataflowRunner does not currently support splittable DoFn: >> org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@724c5cbe >> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator. >> translateFn(DataflowPipelineTranslator.java:1024) >> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator. >> access$1500(DataflowPipelineTranslator.java:113) >> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8. >> translateSingleHelper(DataflowPipelineTranslator.java:910) >> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8. >> translate(DataflowPipelineTranslator.java:897) >> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8. >> translate(DataflowPipelineTranslator.java:894) >> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$ >> Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453) >> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit( >> TransformHierarchy.java:668) >> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit( >> TransformHierarchy.java:660) >> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit( >> TransformHierarchy.java:660) >> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit( >> TransformHierarchy.java:660) >> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit( >> TransformHierarchy.java:660) >> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node. >> access$600(TransformHierarchy.java:311) >> [error] at org.apache.beam.sdk.runners.TransformHierarchy.visit( >> TransformHierarchy.java:245) >> [error] at org.apache.beam.sdk.Pipeline.traverseTopologically( >> Pipeline.java:458) >> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$ >> Translator.translate(DataflowPipelineTranslator.java:392) >> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator. >> translate(DataflowPipelineTranslator.java:170) >> [error] at org.apache.beam.runners.dataflow.DataflowRunner.run( >> DataflowRunner.java:680) >> [error] at org.apache.beam.runners.dataflow.DataflowRunner.run( >> DataflowRunner.java:174) >> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) >> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) >> [error] at com.spotify.scio.ScioContext$$anonfun$close$1.apply( >> ScioContext.scala:343) >> [error] at com.spotify.scio.ScioContext$$anonfun$close$1.apply( >> ScioContext.scala:335) >> [error] at com.spotify.scio.ScioContext.requireNotClosed(ScioContext. >> scala:391) >> [error] at com.spotify.scio.ScioContext.close(ScioContext.scala:335) >> ... >> >> It requires "Splittable DoFn" support and, according to the compatibility >> matrix, DataflowRunner supports it: https://beam.apache.org/ >> documentation/runners/capability-matrix/#cap-full-where >> >> Could you please shed some light on this? >> >> Thanks! >> Cheers >> >
