You have the SparkPipelineOptions you can set (org.apache.beam.runners.spark.SparkPipelineOptions - https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java), most of these options are explained at https://beam.apache.org/documentation/runners/spark/. You can add custom PipelineOptions if you want to use them from your transforms: https://beam.apache.org/documentation/programming-guide/#setting-pipelineoptions-from-command-line-arguments .
Hope it helps Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://blog-rmannibucau.rhcloud.com> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory <https://javaeefactory-rmannibucau.rhcloud.com> 2017-09-27 12:52 GMT+02:00 tal m <[email protected]>: > my last email want't clear, please ignore. > Thanks it's looks better (no error or exceptions) > my problem now is how to set Spark conf to my pipeline, this is what i > have ? > > onf = SparkConf(); > conf.setAppName("InsightEdge Python Example") > conf.set("my.field1", "XXX") > > conf.set("my.field2", "YYY") > > how can i send it to my pipeline, i guess within command line ? > > Thanks Tal > > > On Wed, Sep 27, 2017 at 1:45 PM, tal m <[email protected]> wrote: > >> Thanks it's looks better (no error or exceptions) >> my problem now is how to set Spark conf to my pipeline, this is what i >> have ? >> >> onf = SparkConf(); >> conf.setAppName("InsightEdge Python Example") >> conf.set("spark.insightedge.space.name", "insightedge-space") >> conf.set("spark.insightedge.space.lookup.group", "insightedge") >> conf.set("spark.insightedge.space.lookup.locator", "127.0.0.1:4174") >> >> >> >> On Tue, Sep 26, 2017 at 3:46 PM, Romain Manni-Bucau < >> [email protected]> wrote: >> >>> Yes, you need a coder for your product if it is passed to the output for >>> next "apply". You can register it on the pipeline or through beam SPI. Here >>> is a sample to use java serialization: >>> >>> pipeline.getCoderRegistry().registerCoderForClass(Product.class, >>> SerializableCoder.of(Product.class)); >>> >>> >>> >>> Romain Manni-Bucau >>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>> <https://blog-rmannibucau.rhcloud.com> | Old Blog >>> <http://rmannibucau.wordpress.com> | Github >>> <https://github.com/rmannibucau> | LinkedIn >>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory >>> <https://javaeefactory-rmannibucau.rhcloud.com> >>> >>> 2017-09-26 14:37 GMT+02:00 tal m <[email protected]>: >>> >>>> hi >>>> i tried what you wrote the argumants that i use are: >>>> --sparkMaster=local --runner=SparkRunner >>>> i already have Spark running. >>>> now i'm getting the following error: >>>> >>>> >>>> .IllegalStateException: Unable to return a default Coder for >>>> ParDo(Anonymous)/ParMultiDo(Anonymous).out0 [PCollectio >>>> >>>> >>>> Thanks Tal >>>> >>>> >>>> >>>> On Tue, Sep 26, 2017 at 12:32 PM, Romain Manni-Bucau < >>>> [email protected]> wrote: >>>> >>>>> Hi Tal, >>>>> >>>>> Did you try something like that: >>>>> >>>>> public static void main(final String[] args) { >>>>> final Pipeline pipeline = >>>>> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()); >>>>> >>>>> pipeline.apply(GenerateSequence.from(0).to(100000L)) >>>>> .apply(ParDo.of(new DoFn<Integer, Product>() { >>>>> @ProcessElement >>>>> public void onElement(final ProcessContext context) { >>>>> final int i = context.element(); >>>>> context.output(new Product(i, "Product #" + i)); >>>>> } >>>>> })); >>>>> >>>>> pipeline.run(); >>>>> } >>>>> >>>>> >>>>> Then it is just a matter of having the beam dependency matching your >>>>> runner (target environment). For testing the direct runner is enough but >>>>> to run on spark you will need to import the spark one as dependency. >>>>> >>>>> >>>>> >>>>> Romain Manni-Bucau >>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>> <https://blog-rmannibucau.rhcloud.com> | Old Blog >>>>> <http://rmannibucau.wordpress.com> | Github >>>>> <https://github.com/rmannibucau> | LinkedIn >>>>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory >>>>> <https://javaeefactory-rmannibucau.rhcloud.com> >>>>> >>>>> 2017-09-26 11:02 GMT+02:00 tal m <[email protected]>: >>>>> >>>>>> HI >>>>>> i looked at the links you sent me, and i haven't found any clue how >>>>>> to adapt it to my current code. >>>>>> my code is very simple: >>>>>> >>>>>> val sc = spark.sparkContext >>>>>> >>>>>> val productsNum = 100000 >>>>>> println(s"Saving $productsNum products RDD to the space") >>>>>> val rdd = sc.parallelize(1 to productsNum).map { i => >>>>>> Product(i, "Description of product " + i, Random.nextInt(10), >>>>>> Random.nextBoolean()) >>>>>> } >>>>>> >>>>>> is that simple to use beam instead of SparkContext ? i'm not familiar >>>>>> with Spark at all so i have no idea what is Spark runner and how can i >>>>>> use it in my case, just need to make it work :). >>>>>> >>>>>> Thanks Tal >>>>>> >>>>>> >>>>>> On Tue, Sep 26, 2017 at 11:57 AM, Aviem Zur <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Tal, >>>>>>> >>>>>>> Thanks for reaching out! >>>>>>> >>>>>>> Please take a look at our documentation: >>>>>>> >>>>>>> Quickstart guide (Java): https://beam.apache.or >>>>>>> g/get-started/quickstart-java/ >>>>>>> This guide will show you how to run our wordcount example using each >>>>>>> any of the runners (For example, direct runner or Spark runner in your >>>>>>> case). >>>>>>> >>>>>>> More reading: >>>>>>> Programming guide: https://beam.apache.org >>>>>>> /documentation/programming-guide/ >>>>>>> Spark runner: https://beam.apache.org/documentation/runners/spark/ >>>>>>> >>>>>>> Please let us know if you have further questions, and good luck with >>>>>>> your first try of Beam! >>>>>>> >>>>>>> Aviem. >>>>>>> >>>>>>> On Tue, Sep 26, 2017 at 11:47 AM tal m <[email protected]> wrote: >>>>>>> >>>>>>>> hi >>>>>>>> i'm new at Spark and also at beam. >>>>>>>> currently i have Java code that use Spark from reading some data >>>>>>>> from DB. >>>>>>>> my Spark code using SparkSession.builder (.....) and also >>>>>>>> sparkContext. >>>>>>>> how can i make beam work similar to my current code, i just want >>>>>>>> make it work for now. >>>>>>>> Thanks Tal >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
