I am using jupyter docker stack with spark.
So I started a new notebook and this code.

import multiprocessing
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

import time

t1 = time.time()
number_cores = int(multiprocessing.cpu_count())
memory_gb = 4


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.adaptive.enabled", "True"
    ).set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "100"
    ).set(
        "sc.setLogLevel", "ERROR"
    ).set(
        "spark.executor.memory", "8g")

    return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My_app", SparkConf())

Gives my this in http://HOSTIP:4040/environment/

[image: image.png]

So it works.

lør. 5. nov. 2022 kl. 19:21 skrev 张健BJ <zhangjia...@datagrand.com>:

> ok,thank you very much :) I also have two questions:
> does the "spark. read. format (" jdbc ")" read all the data from the database 
> at once, and does it require a limit. My test result is that with the 
> increase of data, I observe that the local memory usage has not changed 
> significantly. Why?
>
> In addition, I tried to set "spark. driver. memory" and "spark. executor. 
> memory" to 4g in local mode, but I observed that the memory usage did not 
> work, and it was always about 1g. The
> code is as follows:
>
> import multiprocessing
> from pyspark.sql import SparkSession
> from pyspark import SparkConf, SparkContext
>
> import time
>
> t1 = time.time()
> number_cores = int(multiprocessing.cpu_count())
> memory_gb = 4
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
>     conf.setMaster("local[{}]".format(number_cores))
>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>         "spark.sql.adaptive.enabled", "True"
>     ).set(
>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>     ).set(
>         "spark.sql.repl.eagerEval.maxNumRows", "100"
>     ).set(
>         "sc.setLogLevel", "ERROR"
>     ).set(
>         "spark.executor.memory", "4g")
>
>     return 
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
>
> spark = get_spark_session("My_app", SparkConf())
>
> df = spark.read.format("jdbc").options(
>     url='jdbc:mysql://127.0.0.1:63306/recommend?useSSL=false',
>     driver='com.mysql.jdbc.Driver',
>     dbtable="item_info",
>     user="root",
>     password="root"
> ).load()
> my_url = 'jdbc:mysql://127.0.0.1:63306/etl?useSSL=false'
> auth_mysql = {'user': 'root', 'password': 'root'}
> df = df.withColumnRenamed("id", "itemid").withColumnRenamed("category", 
> "cateid") \
>     .withColumnRenamed('weight', 'score').withColumnRenamed('tag', 
> 'item_tags') \
>     .withColumnRenamed('modify_time', 
> 'item_modify_time').withColumnRenamed('start_time', 'dg_start_time') \
>     .withColumnRenamed('end_time', 'dg_end_time')
> df = df.select(
>     ['itemid', 'cateid', 'title', 'score', 'item_tags', 'item_modify_time', 
> 'dg_start_time', 'dg_end_time']).limit(20)
> df.write.jdbc(my_url, 'item_info', mode='append', properties=auth_mysql)
> print(time.time() - t1)
>
> ------------------------------------------------------------------
> 发件人:Bjørn Jørgensen <bjornjorgen...@gmail.com>
> 发送时间:2022年11月5日(星期六) 04:51
> 收件人:Sean Owen <sro...@gmail.com>
> 抄 送:张健BJ <zhangjia...@datagrand.com>; user <user@spark.apache.org>
> 主 题:Re: spark - local question
>
> Yes, Spark in local mode works :)
> One tip
> If you just start it, then the default settings is one core and 1 GB.
>
> I'm using this func to start spark in local mode to get all cors and max
> RAM
>
> import multiprocessing
> import os
> from pyspark.sql import SparkSession
> from pyspark import SparkConf, SparkContext
>
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
> e.g. 4015976448
> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
>     conf.setMaster("local[{}]".format(number_cores))
>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>         "spark.sql.adaptive.enabled", "True"
>     ).set(
>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>     ).set(
>         "spark.sql.repl.eagerEval.maxNumRows", "100"
>     ).set(
>         "sc.setLogLevel", "ERROR"
>     )
>
>     return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
>
> spark = get_spark_session("My_app", SparkConf())
>
>
>
> Now when you type spark you will see something like this.
>
>
> SparkSession - in-memory
>
> SparkContext
>
> Spark UI
>
> Version v3.4.0-SNAPSHOT
> Master  local[16]
> AppName My_app
>
>
> man. 31. okt. 2022 kl. 14:50 skrev Sean Owen <sro...@gmail.com>:
> Sure, as stable and available as your machine is. If you don't need fault
> tolerance or scale beyond one machine, sure.
>
> On Mon, Oct 31, 2022 at 8:43 AM 张健BJ <zhangjia...@datagrand.com> wrote:
> Dear developers:
>     I have a question about  the pyspark local
> mode. Can it be used in production and Will it cause unexpected problems?
> The scenario is as follows:
>
> Our team wants to develop an etl component based on python language. Data can 
> be transferred between various data sources.
>
> If there is no yarn environment, can we read data from Database A and write 
> it to Database B in local mode.Will this function be guaranteed to be stable 
> and available?
>
>
>
> Thanks,
> Look forward to your reply
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to