You need to make sure the delta-core_2.11-0.6.1. jar file in your $SPARK_HOME/jars folder.
On Thu, Jan 14, 2021 at 4:59 AM András Kolbert <kolbertand...@gmail.com> wrote: > sorry missed out a bit. Added, highlighted with yellow. > > On Thu, 14 Jan 2021 at 13:54, András Kolbert <kolbertand...@gmail.com> > wrote: > >> Thanks, Muru, very helpful suggestion! Delta Lake is amazing, completely >> changed a few of my projects! >> >> One question regarding that. >> When I use the following statement, all works fine and I can use delta >> properly, in the spark context that jupyter initiates automatically. >> >> export PYSPARK_DRIVER_PYTHON=jupyter >> export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890' >> >> PYSPARK_PYTHON=pyspark \ >> --master yarn \ >> --deploy-mode client \ >> --driver-memory 4g \ >> --executor-memory 16G \ >> --executor-cores 1 \ >> --num-executors 8 \ >> --conf >> spark.yarn.archive=hdfs://node-master:9000/libs/spark/jars/spark-libs.jar \ >> --jars >> hdfs://node-master:9000/libs/spark/common/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar,hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar >> >> >> However, I would like to have a local pyspark initially, and only connect >> to YARN when the specific notebook is configured in that way. >> >> 1) >> >> export PYSPARK_DRIVER_PYTHON=jupyter >> export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890' >> >> PYSPARK_PYTHON=pyspark >> >> 2) >> conf = spark.sparkContext._conf.setAll([ >> ('spark.app.name', 'Delta Demo'), >> ('spark.yarn.jars', >> 'hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar'), >> > ('spark.master', 'yarn-client'), > ('spark.executor.memory', '16g'), > ('spark.executor.instances', '8'), > ('spark.executor.cores', '1'), > ('spark.driver.memory', '4g'), > >> ("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1"), >> ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"), >> ("spark.sql.catalog.spark_catalog", >> "org.apache.spark.sql.delta.catalog.DeltaCatalog") >> ]) >> spark.sparkContext.stop() >> >> spark = SparkSession \ >> .builder \ >> .config(conf=conf) \ >> .getOrCreate() >> sc = spark.sparkContext >> >> >> spark.sparkContext.addPyFile("hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar") >> from delta.tables import * >> delta_path = "/data/delta-table" >> data = spark.range(0, 5) >> data.show() >> data.write.format("delta").mode("overwrite").save(delta_path) >> >> >> This way, I keep facing with the ' Error: >> java.lang.ClassNotFoundException: Failed to find data source: delta. ' >> error message. >> >> What did I miss in my configuration/env variables? >> >> Thanks >> Andras >> >> >> >> On Sun, 10 Jan 2021, 3:33 am muru, <mmur...@gmail.com> wrote: >> >>> You could try Delta Lake or Apache Hudi for this use case. >>> >>> On Sat, Jan 9, 2021 at 12:32 PM András Kolbert <kolbertand...@gmail.com> >>> wrote: >>> >>>> Sorry if my terminology is misleading. >>>> >>>> What I meant under driver only is to use a local pandas dataframe >>>> (collect the data to the master), and keep updating that instead of dealing >>>> with a spark distributed dataframe for holding this data. >>>> >>>> For example, we have a dataframe with all users and their corresponding >>>> latest activity timestamp. After each streaming batch, aggregations are >>>> performed and the calculation is collected to the driver to update a subset >>>> of users latest activity timestamp. >>>> >>>> >>>> >>>> On Sat, 9 Jan 2021, 6:18 pm Artemis User, <arte...@dtechspace.com> >>>> wrote: >>>> >>>>> Could you please clarify what do you mean by 1)? Driver is only >>>>> responsible for submitting Spark job, not performing. >>>>> >>>>> -- ND >>>>> >>>>> On 1/9/21 9:35 AM, András Kolbert wrote: >>>>> > Hi, >>>>> > I would like to get your advice on my use case. >>>>> > I have a few spark streaming applications where I need to keep >>>>> > updating a dataframe after each batch. Each batch probably affects a >>>>> > small fraction of the dataframe (5k out of 200k records). >>>>> > >>>>> > The options I have been considering so far: >>>>> > 1) keep dataframe on the driver, and update that after each batch >>>>> > 2) keep dataframe distributed, and use checkpointing to mitigate >>>>> lineage >>>>> > >>>>> > I solved previous use cases with option 2, but I am not sure if it >>>>> is >>>>> > the most optimal as checkpointing is relatively expensive. I also >>>>> > wondered about HBASE or some sort of quick access memory storage, >>>>> > however it is currently not in my stack. >>>>> > >>>>> > Curious to hear your thoughts >>>>> > >>>>> > Andras >>>>> > >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>> >>>>>