Thanks Eugene! Now the job can be launched against Dataflow.
Cheers
On Fri, Mar 2, 2018 at 2:23 PM, Eugene Kirpichov
wrote:
> Hi!
>
> It is supported by Dataflow runner in streaming mode. Ideally we should
> force streaming mode if this is used, but for now you can specify
> options.setStreaming(true) or --streaming on command line.
>
> On Fri, Mar 2, 2018 at 4:43 AM Jose Ignacio Honrado
> wrote:
>
>> Hi,
>>
>> I am trying to read files from GCS using the Watch transform
>> (org.apache.beam.sdk.transforms.Watch) to ultimate insert them in BQ. I
>> am using Scio API and here is a test pipeline code:
>>
>> object GCS2BQ {
>> private val Logger = LoggerFactory.getLogger(getClass)
>>
>> def main(cmdLineArgs: Array[String]): Unit = {
>> val opts = PipelineOptionsFactory
>> .fromArgs(cmdLineArgs: _*)
>> .withValidation()
>> .as(classOf[GCS2BQOptions])
>> val sc = ScioContext(opts)
>>
>> val pollFilesFromSource = FileIO.`match`()
>> .continuously(Duration.standardMinutes(opts.getPollInterval),
>> Watch.Growth.never())
>> .filepattern(opts.getSourcePath)
>>
>> sc.customInput("GCS Input", pollFilesFromSource)
>> .applyTransform(FileIO.readMatches().withCompression(
>> Compression.GZIP))
>> .applyTransform(TextIO.readFiles())
>> .debug()
>>
>> sc.close().waitUntilDone()
>> }
>>
>> It works properly using DirectRunner but I obtain the following when
>> trying to run it on Dataflow:
>>
>> ...
>> [error] [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil -
>> Staging files complete: 137 files cached, 1 files newly uploaded
>> [error] [main] INFO
>> org.apache.beam.runners.dataflow.DataflowPipelineTranslator
>> - Adding GCS Input/Create filepattern/Create.Values/Read(CreateSource)
>> as step s1
>> [error] [main] INFO
>> org.apache.beam.runners.dataflow.DataflowPipelineTranslator
>> - Adding GCS Input/Via MatchAll/Continuously match
>> filepatterns/ParDo(WatchGrowth)
>> as step s2
>> [error] Exception in thread "main" java.lang.UnsupportedOperationException:
>> DataflowRunner does not currently support splittable DoFn:
>> org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@724c5cbe
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> translateFn(DataflowPipelineTranslator.java:1024)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> access$1500(DataflowPipelineTranslator.java:113)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translateSingleHelper(DataflowPipelineTranslator.java:910)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translate(DataflowPipelineTranslator.java:897)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translate(DataflowPipelineTranslator.java:894)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$
>> Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:668)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.
>> access$600(TransformHierarchy.java:311)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy.visit(
>> TransformHierarchy.java:245)
>> [error] at org.apache.beam.sdk.Pipeline.traverseTopologically(
>> Pipeline.java:458)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$
>> Translator.translate(DataflowPipelineTranslator.java:392)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> translate(DataflowPipelineTranslator.java:170)
>> [error] at org.apache.beam.runners.dataflow.DataflowRunner.run(
>> DataflowRunner.java:680)
>> [error] at org.apache.beam.runners.dataflow.DataflowRunner.run(
>> DataflowRunner.java:174)
>> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
>> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> [error] at com.spotify.scio.ScioContext$$anonfun$close$1.apply(
>> ScioContext.scala:343)
>> [error] at com.spotify.scio.ScioContext$$anonfun$close$1.apply(
>> ScioContext.scala:335)
>> [error] at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.
>> scala:391)
>> [error] at com.spotify.scio.ScioContext.close(ScioContext.scala:335)
>> ...
>>
>> It requires "Splittable DoFn" support and, according to the compatibility
>> matrix, DataflowRunner supports it: https://beam.apache.org/
>> documentation/runners/capability-matrix/#cap-full-where