Hi! It is supported by Dataflow runner in streaming mode. Ideally we should force streaming mode if this is used, but for now you can specify options.setStreaming(true) or --streaming on command line.
On Fri, Mar 2, 2018 at 4:43 AM Jose Ignacio Honrado <[email protected]> wrote: > Hi, > > I am trying to read files from GCS using the Watch transform > (org.apache.beam.sdk.transforms.Watch) to ultimate insert them in BQ. I am > using Scio API and here is a test pipeline code: > > object GCS2BQ { > private val Logger = LoggerFactory.getLogger(getClass) > > def main(cmdLineArgs: Array[String]): Unit = { > val opts = PipelineOptionsFactory > .fromArgs(cmdLineArgs: _*) > .withValidation() > .as(classOf[GCS2BQOptions]) > val sc = ScioContext(opts) > > val pollFilesFromSource = FileIO.`match`() > .continuously(Duration.standardMinutes(opts.getPollInterval), > Watch.Growth.never()) > .filepattern(opts.getSourcePath) > > sc.customInput("GCS Input", pollFilesFromSource) > > .applyTransform(FileIO.readMatches().withCompression(Compression.GZIP)) > .applyTransform(TextIO.readFiles()) > .debug() > > sc.close().waitUntilDone() > } > > It works properly using DirectRunner but I obtain the following when > trying to run it on Dataflow: > > ... > [error] [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil - > Staging files complete: 137 files cached, 1 files newly uploaded > [error] [main] INFO > org.apache.beam.runners.dataflow.DataflowPipelineTranslator - Adding GCS > Input/Create filepattern/Create.Values/Read(CreateSource) as step s1 > [error] [main] INFO > org.apache.beam.runners.dataflow.DataflowPipelineTranslator - Adding GCS > Input/Via MatchAll/Continuously match filepatterns/ParDo(WatchGrowth) as > step s2 > [error] Exception in thread "main" > java.lang.UnsupportedOperationException: DataflowRunner does not currently > support splittable DoFn: > org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@724c5cbe > [error] at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateFn(DataflowPipelineTranslator.java:1024) > [error] at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$1500(DataflowPipelineTranslator.java:113) > [error] at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translateSingleHelper(DataflowPipelineTranslator.java:910) > [error] at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:897) > [error] at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:894) > [error] at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453) > [error] at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668) > [error] at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660) > [error] at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660) > [error] at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660) > [error] at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660) > [error] at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > [error] at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > [error] at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > [error] at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392) > [error] at > org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170) > [error] at > org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680) > [error] at > org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174) > [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) > [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > [error] at > com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:343) > [error] at > com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:335) > [error] at > com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:391) > [error] at com.spotify.scio.ScioContext.close(ScioContext.scala:335) > ... > > It requires "Splittable DoFn" support and, according to the compatibility > matrix, DataflowRunner supports it: > https://beam.apache.org/documentation/runners/capability-matrix/#cap-full-where > > Could you please shed some light on this? > > Thanks! > Cheers >
