Re: Splittable DoFn on DataflowRunner

2018-03-02 Thread Jose Ignacio Honrado
Thanks Eugene! Now the job can be launched against Dataflow.

Cheers

On Fri, Mar 2, 2018 at 2:23 PM, Eugene Kirpichov 
wrote:

> Hi!
>
> It is supported by Dataflow runner in streaming mode. Ideally we should
> force streaming mode if this is used, but for now you can specify
> options.setStreaming(true) or --streaming on command line.
>
> On Fri, Mar 2, 2018 at 4:43 AM Jose Ignacio Honrado 
> wrote:
>
>> Hi,
>>
>> I am trying to read files from GCS using the Watch transform
>> (org.apache.beam.sdk.transforms.Watch) to ultimate insert them in BQ. I
>> am using Scio API and here is a test pipeline code:
>>
>> object GCS2BQ {
>>   private val Logger = LoggerFactory.getLogger(getClass)
>>
>>   def main(cmdLineArgs: Array[String]): Unit = {
>> val opts = PipelineOptionsFactory
>>   .fromArgs(cmdLineArgs: _*)
>>   .withValidation()
>>   .as(classOf[GCS2BQOptions])
>> val sc = ScioContext(opts)
>>
>> val pollFilesFromSource = FileIO.`match`()
>>   .continuously(Duration.standardMinutes(opts.getPollInterval),
>> Watch.Growth.never())
>>   .filepattern(opts.getSourcePath)
>>
>> sc.customInput("GCS Input", pollFilesFromSource)
>>   .applyTransform(FileIO.readMatches().withCompression(
>> Compression.GZIP))
>>   .applyTransform(TextIO.readFiles())
>>   .debug()
>>
>> sc.close().waitUntilDone()
>>   }
>>
>> It works properly using DirectRunner but I obtain the following when
>> trying to run it on Dataflow:
>>
>> ...
>> [error] [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil -
>> Staging files complete: 137 files cached, 1 files newly uploaded
>> [error] [main] INFO 
>> org.apache.beam.runners.dataflow.DataflowPipelineTranslator
>> - Adding GCS Input/Create filepattern/Create.Values/Read(CreateSource)
>> as step s1
>> [error] [main] INFO 
>> org.apache.beam.runners.dataflow.DataflowPipelineTranslator
>> - Adding GCS Input/Via MatchAll/Continuously match 
>> filepatterns/ParDo(WatchGrowth)
>> as step s2
>> [error] Exception in thread "main" java.lang.UnsupportedOperationException:
>> DataflowRunner does not currently support splittable DoFn:
>> org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@724c5cbe
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> translateFn(DataflowPipelineTranslator.java:1024)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> access$1500(DataflowPipelineTranslator.java:113)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translateSingleHelper(DataflowPipelineTranslator.java:910)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translate(DataflowPipelineTranslator.java:897)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.
>> translate(DataflowPipelineTranslator.java:894)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$
>> Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:668)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
>> TransformHierarchy.java:660)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy$Node.
>> access$600(TransformHierarchy.java:311)
>> [error] at org.apache.beam.sdk.runners.TransformHierarchy.visit(
>> TransformHierarchy.java:245)
>> [error] at org.apache.beam.sdk.Pipeline.traverseTopologically(
>> Pipeline.java:458)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$
>> Translator.translate(DataflowPipelineTranslator.java:392)
>> [error] at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.
>> translate(DataflowPipelineTranslator.java:170)
>> [error] at org.apache.beam.runners.dataflow.DataflowRunner.run(
>> DataflowRunner.java:680)
>> [error] at org.apache.beam.runners.dataflow.DataflowRunner.run(
>> DataflowRunner.java:174)
>> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
>> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> [error] at com.spotify.scio.ScioContext$$anonfun$close$1.apply(
>> ScioContext.scala:343)
>> [error] at com.spotify.scio.ScioContext$$anonfun$close$1.apply(
>> ScioContext.scala:335)
>> [error] at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.
>> scala:391)
>> [error] at com.spotify.scio.ScioContext.close(ScioContext.scala:335)
>> ...
>>
>> It requires "Splittable DoFn" support and, according to the compatibility
>> matrix, DataflowRunner supports it: https://beam.apache.org/
>> documentation/runners/capability-matrix/#cap-full-where

Re: Splittable DoFn on DataflowRunner

2018-03-02 Thread Eugene Kirpichov
Hi!

It is supported by Dataflow runner in streaming mode. Ideally we should
force streaming mode if this is used, but for now you can specify
options.setStreaming(true) or --streaming on command line.

On Fri, Mar 2, 2018 at 4:43 AM Jose Ignacio Honrado 
wrote:

> Hi,
>
> I am trying to read files from GCS using the Watch transform
> (org.apache.beam.sdk.transforms.Watch) to ultimate insert them in BQ. I am
> using Scio API and here is a test pipeline code:
>
> object GCS2BQ {
>   private val Logger = LoggerFactory.getLogger(getClass)
>
>   def main(cmdLineArgs: Array[String]): Unit = {
> val opts = PipelineOptionsFactory
>   .fromArgs(cmdLineArgs: _*)
>   .withValidation()
>   .as(classOf[GCS2BQOptions])
> val sc = ScioContext(opts)
>
> val pollFilesFromSource = FileIO.`match`()
>   .continuously(Duration.standardMinutes(opts.getPollInterval),
> Watch.Growth.never())
>   .filepattern(opts.getSourcePath)
>
> sc.customInput("GCS Input", pollFilesFromSource)
>
> .applyTransform(FileIO.readMatches().withCompression(Compression.GZIP))
>   .applyTransform(TextIO.readFiles())
>   .debug()
>
> sc.close().waitUntilDone()
>   }
>
> It works properly using DirectRunner but I obtain the following when
> trying to run it on Dataflow:
>
> ...
> [error] [main] INFO org.apache.beam.runners.dataflow.util.PackageUtil -
> Staging files complete: 137 files cached, 1 files newly uploaded
> [error] [main] INFO
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator - Adding GCS
> Input/Create filepattern/Create.Values/Read(CreateSource) as step s1
> [error] [main] INFO
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator - Adding GCS
> Input/Via MatchAll/Continuously match filepatterns/ParDo(WatchGrowth) as
> step s2
> [error] Exception in thread "main"
> java.lang.UnsupportedOperationException: DataflowRunner does not currently
> support splittable DoFn:
> org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@724c5cbe
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translateFn(DataflowPipelineTranslator.java:1024)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.access$1500(DataflowPipelineTranslator.java:113)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translateSingleHelper(DataflowPipelineTranslator.java:910)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:897)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$8.translate(DataflowPipelineTranslator.java:894)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> [error] at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> [error] at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
> [error] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
> [error] at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
> [error] at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
> [error] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> [error] at
> com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:343)
> [error] at
> com.spotify.scio.ScioContext$$anonfun$close$1.apply(ScioContext.scala:335)
> [error] at
> com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:391)
> [error] at com.spotify.scio.ScioContext.close(ScioContext.scala:335)
> ...
>
> It requires "Splittable DoFn" support and, according to the compatibility
> matrix, DataflowRunner supports it:
> https://beam.apache.org/documentation/runners/capability-matrix/#cap-full-where
>
> Could you please shed some light on this?
>
> Thanks!
> Cheers
>