You have the SparkPipelineOptions you can set
(org.apache.beam.runners.spark.SparkPipelineOptions -
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java),
most of these options are explained at
https://beam.apache.org/documentation/runners/spark/. You can add custom
PipelineOptions if you want to use them from your transforms:
https://beam.apache.org/documentation/programming-guide/#setting-pipelineoptions-from-command-line-arguments
.

Hope it helps


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://blog-rmannibucau.rhcloud.com> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
<https://javaeefactory-rmannibucau.rhcloud.com>

2017-09-27 12:52 GMT+02:00 tal m <[email protected]>:

> my last email want't clear, please ignore.
> Thanks it's looks better (no error or exceptions)
> my problem now is how to set Spark conf to my pipeline, this is what i
> have ?
>
> onf = SparkConf();
>  conf.setAppName("InsightEdge Python Example")
>  conf.set("my.field1", "XXX")
>
>  conf.set("my.field2", "YYY")
>
> how can i send it to my pipeline, i guess within command line ?
>
> Thanks Tal
>
>
> On Wed, Sep 27, 2017 at 1:45 PM, tal m <[email protected]> wrote:
>
>> Thanks it's looks better (no error or exceptions)
>> my problem now is how to set Spark conf to my pipeline, this is what i
>> have ?
>>
>> onf = SparkConf();
>>  conf.setAppName("InsightEdge Python Example")
>>  conf.set("spark.insightedge.space.name", "insightedge-space")
>>  conf.set("spark.insightedge.space.lookup.group", "insightedge")
>>  conf.set("spark.insightedge.space.lookup.locator", "127.0.0.1:4174")
>>
>>
>>
>> On Tue, Sep 26, 2017 at 3:46 PM, Romain Manni-Bucau <
>> [email protected]> wrote:
>>
>>> Yes, you need a coder for your product if it is passed to the output for
>>> next "apply". You can register it on the pipeline or through beam SPI. Here
>>> is a sample to use java serialization:
>>>
>>> pipeline.getCoderRegistry().registerCoderForClass(Product.class, 
>>> SerializableCoder.of(Product.class));
>>>
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://blog-rmannibucau.rhcloud.com> | Old Blog
>>> <http://rmannibucau.wordpress.com> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
>>> <https://javaeefactory-rmannibucau.rhcloud.com>
>>>
>>> 2017-09-26 14:37 GMT+02:00 tal m <[email protected]>:
>>>
>>>> hi
>>>> i tried what you wrote the argumants that i use are:
>>>> --sparkMaster=local --runner=SparkRunner
>>>> i already have Spark running.
>>>> now i'm getting the following error:
>>>>
>>>>
>>>> .IllegalStateException: Unable to return a default Coder for
>>>> ParDo(Anonymous)/ParMultiDo(Anonymous).out0 [PCollectio
>>>>
>>>>
>>>> Thanks Tal
>>>>
>>>>
>>>>
>>>> On Tue, Sep 26, 2017 at 12:32 PM, Romain Manni-Bucau <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Tal,
>>>>>
>>>>> Did you try something like that:
>>>>>
>>>>> public static void main(final String[] args) {
>>>>>     final Pipeline pipeline = 
>>>>> Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
>>>>>
>>>>>     pipeline.apply(GenerateSequence.from(0).to(100000L))
>>>>>             .apply(ParDo.of(new DoFn<Integer, Product>() {
>>>>>                 @ProcessElement
>>>>>                 public void onElement(final ProcessContext context) {
>>>>>                     final int i = context.element();
>>>>>                     context.output(new Product(i, "Product #" + i));
>>>>>                 }
>>>>>             }));
>>>>>
>>>>>     pipeline.run();
>>>>> }
>>>>>
>>>>>
>>>>> Then it is just a matter of having the beam dependency matching your 
>>>>> runner (target environment). For testing the direct runner is enough but 
>>>>> to run on spark you will need to import the spark one as dependency.
>>>>>
>>>>>
>>>>>
>>>>> Romain Manni-Bucau
>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>> <https://blog-rmannibucau.rhcloud.com> | Old Blog
>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>> <https://www.linkedin.com/in/rmannibucau> | JavaEE Factory
>>>>> <https://javaeefactory-rmannibucau.rhcloud.com>
>>>>>
>>>>> 2017-09-26 11:02 GMT+02:00 tal m <[email protected]>:
>>>>>
>>>>>> HI
>>>>>> i looked at the links you sent me, and i haven't found any clue how
>>>>>> to adapt it to my current code.
>>>>>> my code is very simple:
>>>>>>
>>>>>> val sc = spark.sparkContext
>>>>>>
>>>>>> val productsNum = 100000
>>>>>> println(s"Saving $productsNum products RDD to the space")
>>>>>> val rdd = sc.parallelize(1 to productsNum).map { i =>
>>>>>>   Product(i, "Description of product " + i, Random.nextInt(10), 
>>>>>> Random.nextBoolean())
>>>>>> }
>>>>>>
>>>>>> is that simple to use beam instead of SparkContext ? i'm not familiar 
>>>>>> with Spark at all so i have no idea what is Spark runner and how can i 
>>>>>> use it in my case, just need to make it work :).
>>>>>>
>>>>>> Thanks Tal
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 26, 2017 at 11:57 AM, Aviem Zur <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Tal,
>>>>>>>
>>>>>>> Thanks for reaching out!
>>>>>>>
>>>>>>> Please take a look at our documentation:
>>>>>>>
>>>>>>> Quickstart guide (Java): https://beam.apache.or
>>>>>>> g/get-started/quickstart-java/
>>>>>>> This guide will show you how to run our wordcount example using each
>>>>>>> any of the runners (For example, direct runner or Spark runner in your
>>>>>>> case).
>>>>>>>
>>>>>>> More reading:
>>>>>>> Programming guide: https://beam.apache.org
>>>>>>> /documentation/programming-guide/
>>>>>>> Spark runner: https://beam.apache.org/documentation/runners/spark/
>>>>>>>
>>>>>>> Please let us know if you have further questions, and good luck with
>>>>>>> your first try of Beam!
>>>>>>>
>>>>>>> Aviem.
>>>>>>>
>>>>>>> On Tue, Sep 26, 2017 at 11:47 AM tal m <[email protected]> wrote:
>>>>>>>
>>>>>>>> hi
>>>>>>>> i'm new at Spark and also at beam.
>>>>>>>> currently i have Java code that use Spark from reading some data
>>>>>>>> from DB.
>>>>>>>> my Spark code using SparkSession.builder (.....) and also
>>>>>>>> sparkContext.
>>>>>>>> how can i make beam work similar to my current code, i just want
>>>>>>>> make it work for now.
>>>>>>>> Thanks Tal
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to