sorry missed out a bit. Added, highlighted with yellow. On Thu, 14 Jan 2021 at 13:54, András Kolbert <kolbertand...@gmail.com> wrote:
> Thanks, Muru, very helpful suggestion! Delta Lake is amazing, completely > changed a few of my projects! > > One question regarding that. > When I use the following statement, all works fine and I can use delta > properly, in the spark context that jupyter initiates automatically. > > export PYSPARK_DRIVER_PYTHON=jupyter > export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890' > > PYSPARK_PYTHON=pyspark \ > --master yarn \ > --deploy-mode client \ > --driver-memory 4g \ > --executor-memory 16G \ > --executor-cores 1 \ > --num-executors 8 \ > --conf > spark.yarn.archive=hdfs://node-master:9000/libs/spark/jars/spark-libs.jar \ > --jars > hdfs://node-master:9000/libs/spark/common/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar,hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar > > > However, I would like to have a local pyspark initially, and only connect > to YARN when the specific notebook is configured in that way. > > 1) > > export PYSPARK_DRIVER_PYTHON=jupyter > export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890' > > PYSPARK_PYTHON=pyspark > > 2) > conf = spark.sparkContext._conf.setAll([ > ('spark.app.name', 'Delta Demo'), > ('spark.yarn.jars', > 'hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar'), > ('spark.master', 'yarn-client'), ('spark.executor.memory', '16g'), ('spark.executor.instances', '8'), ('spark.executor.cores', '1'), ('spark.driver.memory', '4g'), > ("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1"), > ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"), > ("spark.sql.catalog.spark_catalog", > "org.apache.spark.sql.delta.catalog.DeltaCatalog") > ]) > spark.sparkContext.stop() > > spark = SparkSession \ > .builder \ > .config(conf=conf) \ > .getOrCreate() > sc = spark.sparkContext > > > spark.sparkContext.addPyFile("hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar") > from delta.tables import * > delta_path = "/data/delta-table" > data = spark.range(0, 5) > data.show() > data.write.format("delta").mode("overwrite").save(delta_path) > > > This way, I keep facing with the ' Error: > java.lang.ClassNotFoundException: Failed to find data source: delta. ' > error message. > > What did I miss in my configuration/env variables? > > Thanks > Andras > > > > On Sun, 10 Jan 2021, 3:33 am muru, <mmur...@gmail.com> wrote: > >> You could try Delta Lake or Apache Hudi for this use case. >> >> On Sat, Jan 9, 2021 at 12:32 PM András Kolbert <kolbertand...@gmail.com> >> wrote: >> >>> Sorry if my terminology is misleading. >>> >>> What I meant under driver only is to use a local pandas dataframe >>> (collect the data to the master), and keep updating that instead of dealing >>> with a spark distributed dataframe for holding this data. >>> >>> For example, we have a dataframe with all users and their corresponding >>> latest activity timestamp. After each streaming batch, aggregations are >>> performed and the calculation is collected to the driver to update a subset >>> of users latest activity timestamp. >>> >>> >>> >>> On Sat, 9 Jan 2021, 6:18 pm Artemis User, <arte...@dtechspace.com> >>> wrote: >>> >>>> Could you please clarify what do you mean by 1)? Driver is only >>>> responsible for submitting Spark job, not performing. >>>> >>>> -- ND >>>> >>>> On 1/9/21 9:35 AM, András Kolbert wrote: >>>> > Hi, >>>> > I would like to get your advice on my use case. >>>> > I have a few spark streaming applications where I need to keep >>>> > updating a dataframe after each batch. Each batch probably affects a >>>> > small fraction of the dataframe (5k out of 200k records). >>>> > >>>> > The options I have been considering so far: >>>> > 1) keep dataframe on the driver, and update that after each batch >>>> > 2) keep dataframe distributed, and use checkpointing to mitigate >>>> lineage >>>> > >>>> > I solved previous use cases with option 2, but I am not sure if it is >>>> > the most optimal as checkpointing is relatively expensive. I also >>>> > wondered about HBASE or some sort of quick access memory storage, >>>> > however it is currently not in my stack. >>>> > >>>> > Curious to hear your thoughts >>>> > >>>> > Andras >>>> > >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>>