Hi,

I am trying to read files from GCS using the Watch transform
(org.apache.beam.sdk.transforms.Watch) to ultimate insert them in BQ. I am
using Scio API and here is a test pipeline code:

object GCS2BQ {
  private val Logger = LoggerFactory.getLogger(getClass)

  def main(cmdLineArgs: Array[String]): Unit = {
    val opts = PipelineOptionsFactory
      .fromArgs(cmdLineArgs: _*)
      .withValidation()
      .as(classOf[GCS2BQOptions])
    val sc = ScioContext(opts)

    val pollFilesFromSource = FileIO.`match`()
      .continuously(Duration.standardMinutes(opts.getPollInterval),
Watch.Growth.never())
      .filepattern(opts.getSourcePath)

    sc.customInput("GCS Input", pollFilesFromSource)

.applyTransform(FileIO.readMatches().withCompression(Compression.GZIP))
      .applyTransform(TextIO.readFiles())
      .debug()

    sc.close().waitUntilDone()
  }

It works properly using DirectRunner but I obtain the following when trying
to run it on Dataflow:

...
[error] [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil -
Staging files complete: 137 files cached, 1 files newly uploaded
[error] [main] INFO
org.apache.beam.runners.dataflow.DataflowPipelineTranslator - Adding GCS
Input/Create filepattern/Create.Values/Read(CreateSource) as step s1
[error] [main] INFO
org.apache.beam.runners.dataflow.DataflowPipelineTranslator - Adding GCS
Input/Via MatchAll/Continuously match filepatterns/ParDo(WatchGrowth) as
step s2
[error] Exception in thread "main" java.lang.UnsupportedOperationException:
DataflowRunner does not currently support splittable DoFn:
org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@724c5cbe
[error] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateFn(DataflowPipelineTranslator.java:1024)
[error] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$1500(DataflowPipelineTranslator.java:113)
[error] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translateSingleHelper(DataflowPipelineTranslator.java:910)
[error] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:897)
[error] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:894)
[error] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
[error] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
[error] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
[error] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
[error] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
[error] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
[error] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
[error] at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
[error] at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
[error] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
[error] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
[error] at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
[error] at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
[error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
[error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
[error] at
com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:343)
[error] at
com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:335)
[error] at
com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:391)
[error] at com.spotify.scio.ScioContext.close(ScioContext.scala:335)
...

It requires "Splittable DoFn" support and, according to the compatibility
matrix, DataflowRunner supports it:
https://beam.apache.org/documentation/runners/capability-matrix/#cap-full-where

Could you please shed some light on this?

Thanks!
Cheers

Reply via email to