Thanks Eugene! Now the job can be launched against Dataflow.

Cheers

On Fri, Mar 2, 2018 at 2:23 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hi!
>
> It is supported by Dataflow runner in streaming mode. Ideally we should
> force streaming mode if this is used, but for now you can specify
> options.setStreaming(true) or --streaming on command line.
>
> On Fri, Mar 2, 2018 at 4:43 AM Jose Ignacio Honrado <jihonra...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am trying to read files from GCS using the Watch transform
>> (org.apache.beam.sdk.transforms.Watch) to ultimate insert them in BQ. I
>> am using Scio API and here is a test pipeline code:
>>
>> object GCS2BQ {
>>   private val Logger = LoggerFactory.getLogger(getClass)
>>
>>   def main(cmdLineArgs: Array[String]): Unit = {
>>     val opts = PipelineOptionsFactory
>>       .fromArgs(cmdLineArgs: _*)
>>       .withValidation()
>>       .as(classOf[GCS2BQOptions])
>>     val sc = ScioContext(opts)
>>
>>     val pollFilesFromSource = FileIO.`match`()
>>       .continuously(Duration.standardMinutes(opts.getPollInterval),
>> Watch.Growth.never())
>>       .filepattern(opts.getSourcePath)
>>
>>     sc.customInput("GCS Input", pollFilesFromSource)
>>       .applyTransform(FileIO.readMatches().withCompression(
>> Compression.GZIP))
>>       .applyTransform(TextIO.readFiles())
>>       .debug()
>>
>>     sc.close().waitUntilDone()
>>   }
>>
>> It works properly using DirectRunner but I obtain the following when
>> trying to run it on Dataflow:
>>
>> ...
>> [error] [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil -
>> Staging files complete: 137 files cached, 1 files newly uploaded
>> [error] [main] INFO 
>> org.apache.beam.runners.dataflow.DataflowPipelineTranslator
>> - Adding GCS Input/Create filepattern/Create.Values/Read(CreateSource)
>> as step s1
>> [error] [main] INFO 
>> org.apache.beam.runners.dataflow.DataflowPipelineTranslator
>> - Adding GCS Input/Via MatchAll/Continuously match 
>> filepatterns/ParDo(WatchGrowth)
>> as step s2
>> [error] Exception in thread "main" java.lang.UnsupportedOperationException:
>> DataflowRunner does not currently support splittable DoFn:
>> org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@724c5cbe
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> translateFn(DataflowPipelineTranslator.java:1024)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> access$1500(DataflowPipelineTranslator.java:113)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translateSingleHelper(DataflowPipelineTranslator.java:910)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translate(DataflowPipelineTranslator.java:897)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translate(DataflowPipelineTranslator.java:894)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$
>> Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:668)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.
>> access$600(TransformHierarchy.java:311)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy.visit(
>> TransformHierarchy.java:245)
>> [error] at org.apache.beam.sdk.Pipeline.traverseTopologically(
>> Pipeline.java:458)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$
>> Translator.translate(DataflowPipelineTranslator.java:392)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> translate(DataflowPipelineTranslator.java:170)
>> [error] at org.apache.beam.runners.dataflow.DataflowRunner.run(
>> DataflowRunner.java:680)
>> [error] at org.apache.beam.runners.dataflow.DataflowRunner.run(
>> DataflowRunner.java:174)
>> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
>> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> [error] at com.spotify.scio.ScioContext$$anonfun$close$1.apply(
>> ScioContext.scala:343)
>> [error] at com.spotify.scio.ScioContext$$anonfun$close$1.apply(
>> ScioContext.scala:335)
>> [error] at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.
>> scala:391)
>> [error] at com.spotify.scio.ScioContext.close(ScioContext.scala:335)
>> ...
>>
>> It requires "Splittable DoFn" support and, according to the compatibility
>> matrix, DataflowRunner supports it: https://beam.apache.org/
>> documentation/runners/capability-matrix/#cap-full-where
>>
>> Could you please shed some light on this?
>>
>> Thanks!
>> Cheers
>>
>

Reply via email to