Yes, you need a coder for your product if it is passed to the output for next "apply". You can register it on the pipeline or through beam SPI. Here is a sample to use java serialization:
pipeline.getCoderRegistry().registerCoderForClass(Product.class, SerializableCoder.of(Product.class)); Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://blog-rmannibucau.rhcloud.com> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory <https://javaeefactory-rmannibucau.rhcloud.com> 2017-09-26 14:37 GMT+02:00 tal m <[email protected]>: > hi > i tried what you wrote the argumants that i use are: > --sparkMaster=local --runner=SparkRunner > i already have Spark running. > now i'm getting the following error: > > > .IllegalStateException: Unable to return a default Coder for > ParDo(Anonymous)/ParMultiDo(Anonymous).out0 [PCollectio > > > Thanks Tal > > > > On Tue, Sep 26, 2017 at 12:32 PM, Romain Manni-Bucau < > [email protected]> wrote: > >> Hi Tal, >> >> Did you try something like that: >> >> public static void main(final String[] args) { >> final Pipeline pipeline = >> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()); >> >> pipeline.apply(GenerateSequence.from(0).to(100000L)) >> .apply(ParDo.of(new DoFn<Integer, Product>() { >> @ProcessElement >> public void onElement(final ProcessContext context) { >> final int i = context.element(); >> context.output(new Product(i, "Product #" + i)); >> } >> })); >> >> pipeline.run(); >> } >> >> >> Then it is just a matter of having the beam dependency matching your runner >> (target environment). For testing the direct runner is enough but to run on >> spark you will need to import the spark one as dependency. >> >> >> >> Romain Manni-Bucau >> @rmannibucau <https://twitter.com/rmannibucau> | Blog >> <https://blog-rmannibucau.rhcloud.com> | Old Blog >> <http://rmannibucau.wordpress.com> | Github >> <https://github.com/rmannibucau> | LinkedIn >> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory >> <https://javaeefactory-rmannibucau.rhcloud.com> >> >> 2017-09-26 11:02 GMT+02:00 tal m <[email protected]>: >> >>> HI >>> i looked at the links you sent me, and i haven't found any clue how to >>> adapt it to my current code. >>> my code is very simple: >>> >>> val sc = spark.sparkContext >>> >>> val productsNum = 100000 >>> println(s"Saving $productsNum products RDD to the space") >>> val rdd = sc.parallelize(1 to productsNum).map { i => >>> Product(i, "Description of product " + i, Random.nextInt(10), >>> Random.nextBoolean()) >>> } >>> >>> is that simple to use beam instead of SparkContext ? i'm not familiar with >>> Spark at all so i have no idea what is Spark runner and how can i use it in >>> my case, just need to make it work :). >>> >>> Thanks Tal >>> >>> >>> On Tue, Sep 26, 2017 at 11:57 AM, Aviem Zur <[email protected]> wrote: >>> >>>> Hi Tal, >>>> >>>> Thanks for reaching out! >>>> >>>> Please take a look at our documentation: >>>> >>>> Quickstart guide (Java): https://beam.apache.or >>>> g/get-started/quickstart-java/ >>>> This guide will show you how to run our wordcount example using each >>>> any of the runners (For example, direct runner or Spark runner in your >>>> case). >>>> >>>> More reading: >>>> Programming guide: https://beam.apache.org >>>> /documentation/programming-guide/ >>>> Spark runner: https://beam.apache.org/documentation/runners/spark/ >>>> >>>> Please let us know if you have further questions, and good luck with >>>> your first try of Beam! >>>> >>>> Aviem. >>>> >>>> On Tue, Sep 26, 2017 at 11:47 AM tal m <[email protected]> wrote: >>>> >>>>> hi >>>>> i'm new at Spark and also at beam. >>>>> currently i have Java code that use Spark from reading some data from >>>>> DB. >>>>> my Spark code using SparkSession.builder (.....) and also sparkContext. >>>>> how can i make beam work similar to my current code, i just want make >>>>> it work for now. >>>>> Thanks Tal >>>>> >>>> >>> >> >
