What type of Spark configuration are you trying to augment? As Romain mentioned, you can supply a custom Spark conf to your Beam pipeline, but depending on your use case this may not be necessary, and best avoided. Also, keep in mind that if you use spark-submit you can add your configurations when submitting, instead of in your application code. (You can also change Spark defaults on the machines running Spark, but again, this all depends on your use case).
On Wed, Sep 27, 2017 at 2:03 PM Romain Manni-Bucau <[email protected]> wrote: > You have the SparkPipelineOptions you can set > (org.apache.beam.runners.spark.SparkPipelineOptions - > https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java), > most of these options are explained at > https://beam.apache.org/documentation/runners/spark/. You can add custom > PipelineOptions if you want to use them from your transforms: > https://beam.apache.org/documentation/programming-guide/#setting-pipelineoptions-from-command-line-arguments > . > > Hope it helps > > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://blog-rmannibucau.rhcloud.com> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory > <https://javaeefactory-rmannibucau.rhcloud.com> > > 2017-09-27 12:52 GMT+02:00 tal m <[email protected]>: > >> my last email want't clear, please ignore. >> Thanks it's looks better (no error or exceptions) >> my problem now is how to set Spark conf to my pipeline, this is what i >> have ? >> >> onf = SparkConf(); >> conf.setAppName("InsightEdge Python Example") >> conf.set("my.field1", "XXX") >> >> conf.set("my.field2", "YYY") >> >> how can i send it to my pipeline, i guess within command line ? >> >> Thanks Tal >> >> >> On Wed, Sep 27, 2017 at 1:45 PM, tal m <[email protected]> wrote: >> >>> Thanks it's looks better (no error or exceptions) >>> my problem now is how to set Spark conf to my pipeline, this is what i >>> have ? >>> >>> onf = SparkConf(); >>> conf.setAppName("InsightEdge Python Example") >>> conf.set("spark.insightedge.space.name", "insightedge-space") >>> conf.set("spark.insightedge.space.lookup.group", "insightedge") >>> conf.set("spark.insightedge.space.lookup.locator", "127.0.0.1:4174") >>> >>> >>> >>> On Tue, Sep 26, 2017 at 3:46 PM, Romain Manni-Bucau < >>> [email protected]> wrote: >>> >>>> Yes, you need a coder for your product if it is passed to the output >>>> for next "apply". You can register it on the pipeline or through beam SPI. >>>> Here is a sample to use java serialization: >>>> >>>> pipeline.getCoderRegistry().registerCoderForClass(Product.class, >>>> SerializableCoder.of(Product.class)); >>>> >>>> >>>> >>>> Romain Manni-Bucau >>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>> <https://blog-rmannibucau.rhcloud.com> | Old Blog >>>> <http://rmannibucau.wordpress.com> | Github >>>> <https://github.com/rmannibucau> | LinkedIn >>>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory >>>> <https://javaeefactory-rmannibucau.rhcloud.com> >>>> >>>> 2017-09-26 14:37 GMT+02:00 tal m <[email protected]>: >>>> >>>>> hi >>>>> i tried what you wrote the argumants that i use are: >>>>> --sparkMaster=local --runner=SparkRunner >>>>> i already have Spark running. >>>>> now i'm getting the following error: >>>>> >>>>> >>>>> .IllegalStateException: Unable to return a default Coder for >>>>> ParDo(Anonymous)/ParMultiDo(Anonymous).out0 [PCollectio >>>>> >>>>> >>>>> Thanks Tal >>>>> >>>>> >>>>> >>>>> On Tue, Sep 26, 2017 at 12:32 PM, Romain Manni-Bucau < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Tal, >>>>>> >>>>>> Did you try something like that: >>>>>> >>>>>> public static void main(final String[] args) { >>>>>> final Pipeline pipeline = >>>>>> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create()); >>>>>> >>>>>> pipeline.apply(GenerateSequence.from(0).to(100000L)) >>>>>> .apply(ParDo.of(new DoFn<Integer, Product>() { >>>>>> @ProcessElement >>>>>> public void onElement(final ProcessContext context) { >>>>>> final int i = context.element(); >>>>>> context.output(new Product(i, "Product #" + i)); >>>>>> } >>>>>> })); >>>>>> >>>>>> pipeline.run(); >>>>>> } >>>>>> >>>>>> >>>>>> Then it is just a matter of having the beam dependency matching your >>>>>> runner (target environment). For testing the direct runner is enough but >>>>>> to run on spark you will need to import the spark one as dependency. >>>>>> >>>>>> >>>>>> >>>>>> Romain Manni-Bucau >>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>> <https://blog-rmannibucau.rhcloud.com> | Old Blog >>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory >>>>>> <https://javaeefactory-rmannibucau.rhcloud.com> >>>>>> >>>>>> 2017-09-26 11:02 GMT+02:00 tal m <[email protected]>: >>>>>> >>>>>>> HI >>>>>>> i looked at the links you sent me, and i haven't found any clue how >>>>>>> to adapt it to my current code. >>>>>>> my code is very simple: >>>>>>> >>>>>>> val sc = spark.sparkContext >>>>>>> >>>>>>> val productsNum = 100000 >>>>>>> println(s"Saving $productsNum products RDD to the space") >>>>>>> val rdd = sc.parallelize(1 to productsNum).map { i => >>>>>>> Product(i, "Description of product " + i, Random.nextInt(10), >>>>>>> Random.nextBoolean()) >>>>>>> } >>>>>>> >>>>>>> is that simple to use beam instead of SparkContext ? i'm not familiar >>>>>>> with Spark at all so i have no idea what is Spark runner and how can i >>>>>>> use it in my case, just need to make it work :). >>>>>>> >>>>>>> Thanks Tal >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 26, 2017 at 11:57 AM, Aviem Zur <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Tal, >>>>>>>> >>>>>>>> Thanks for reaching out! >>>>>>>> >>>>>>>> Please take a look at our documentation: >>>>>>>> >>>>>>>> Quickstart guide (Java): >>>>>>>> https://beam.apache.org/get-started/quickstart-java/ >>>>>>>> This guide will show you how to run our wordcount example using >>>>>>>> each any of the runners (For example, direct runner or Spark runner in >>>>>>>> your >>>>>>>> case). >>>>>>>> >>>>>>>> More reading: >>>>>>>> Programming guide: >>>>>>>> https://beam.apache.org/documentation/programming-guide/ >>>>>>>> Spark runner: https://beam.apache.org/documentation/runners/spark/ >>>>>>>> >>>>>>>> Please let us know if you have further questions, and good luck >>>>>>>> with your first try of Beam! >>>>>>>> >>>>>>>> Aviem. >>>>>>>> >>>>>>>> On Tue, Sep 26, 2017 at 11:47 AM tal m <[email protected]> wrote: >>>>>>>> >>>>>>>>> hi >>>>>>>>> i'm new at Spark and also at beam. >>>>>>>>> currently i have Java code that use Spark from reading some data >>>>>>>>> from DB. >>>>>>>>> my Spark code using SparkSession.builder (.....) and also >>>>>>>>> sparkContext. >>>>>>>>> how can i make beam work similar to my current code, i just want >>>>>>>>> make it work for now. >>>>>>>>> Thanks Tal >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
