Hi!

It is supported by Dataflow runner in streaming mode. Ideally we should
force streaming mode if this is used, but for now you can specify
options.setStreaming(true) or --streaming on command line.

On Fri, Mar 2, 2018 at 4:43 AM Jose Ignacio Honrado <jihonra...@gmail.com>
wrote:

> Hi,
>
> I am trying to read files from GCS using the Watch transform
> (org.apache.beam.sdk.transforms.Watch) to ultimate insert them in BQ. I am
> using Scio API and here is a test pipeline code:
>
> object GCS2BQ {
>   private val Logger = LoggerFactory.getLogger(getClass)
>
>   def main(cmdLineArgs: Array[String]): Unit = {
>     val opts = PipelineOptionsFactory
>       .fromArgs(cmdLineArgs: _*)
>       .withValidation()
>       .as(classOf[GCS2BQOptions])
>     val sc = ScioContext(opts)
>
>     val pollFilesFromSource = FileIO.`match`()
>       .continuously(Duration.standardMinutes(opts.getPollInterval),
> Watch.Growth.never())
>       .filepattern(opts.getSourcePath)
>
>     sc.customInput("GCS Input", pollFilesFromSource)
>
> .applyTransform(FileIO.readMatches().withCompression(Compression.GZIP))
>       .applyTransform(TextIO.readFiles())
>       .debug()
>
>     sc.close().waitUntilDone()
>   }
>
> It works properly using DirectRunner but I obtain the following when
> trying to run it on Dataflow:
>
> ...
> [error] [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil -
> Staging files complete: 137 files cached, 1 files newly uploaded
> [error] [main] INFO
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator - Adding GCS
> Input/Create filepattern/Create.Values/Read(CreateSource) as step s1
> [error] [main] INFO
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator - Adding GCS
> Input/Via MatchAll/Continuously match filepatterns/ParDo(WatchGrowth) as
> step s2
> [error] Exception in thread "main"
> java.lang.UnsupportedOperationException: DataflowRunner does not currently
> support splittable DoFn:
> org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@724c5cbe
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateFn(DataflowPipelineTranslator.java:1024)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$1500(DataflowPipelineTranslator.java:113)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translateSingleHelper(DataflowPipelineTranslator.java:910)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:897)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:894)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> [error] at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
> [error] at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
> [error] at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> [error] at
> com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:343)
> [error] at
> com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:335)
> [error] at
> com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:391)
> [error] at com.spotify.scio.ScioContext.close(ScioContext.scala:335)
> ...
>
> It requires "Splittable DoFn" support and, according to the compatibility
> matrix, DataflowRunner supports it:
> https://beam.apache.org/documentation/runners/capability-matrix/#cap-full-where
>
> Could you please shed some light on this?
>
> Thanks!
> Cheers
>

Reply via email to