Re: [Spark SQL] Structured Streaming in pyhton can connect to cassandra ?
You don't need to use foreachBatch to write to Cassandra. You just need to use Spark Cassandra Connector version 2.5.0 or higher - it supports native writing of stream data into Cassandra. Here is an announcement: https://www.datastax.com/blog/advanced-apache-cassandra-analytics-now-open-all guillaume farcy at "Mon, 21 Mar 2022 16:33:51 +0100" wrote: gf> Hello, gf> I am a student and I am currently doing a big data project. gf> Here is my code: gf> https://gist.github.com/Balykoo/262d94a7073d5a7e16dfb0d0a576b9c3 gf> My project is to retrieve messages from a twitch chat and send them into kafka then spark gf> reads the kafka topic to perform the processing in the provided gist. gf> I will want to send these messages into cassandra. gf> I tested a first solution on line 72 which works but when there are too many messages gf> spark crashes. Probably due to the fact that my function connects to cassandra each time gf> it is called. gf> I tried the object approach to mutualize the connection object but without success: gf> _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle gf> '_thread.RLock' object gf> Can you please tell me how to do this? gf> Or at least give me some advice? gf> Sincerely, gf> FARCY Guillaume. gf> - gf> To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- With best wishes,Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark 3.2.0 upgrade
Show how do you execute your code - either you didn't pack it as uberjar, or didn't provide all necessary dependencies, if you're using `--jars` option. You may try `-assembly` variant when submitting your application Amit Sharma at "Fri, 21 Jan 2022 11:17:38 -0500" wrote: AS> Hello, I tried using a cassandra unshaded connector or normal connector both are giving the same error at runtime while AS> connecting to cassandra. AS> "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2" AS> Or AS> "com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0" AS> Russ similar issue is reported here also but no solution AS> https://community.datastax.com/questions/3519/issue-with-spring-boot-starter-data-cassandra-and.html AS> Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter AS> at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) AS> at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) AS> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) AS> On Thu, Jan 20, 2022 at 5:17 PM Amit Sharma wrote: AS> Hello, I am trying to upgrade my project from spark 2.3.3 to spark 3.2.0. While running the application locally I am getting AS> below error. AS> AS> Could you please let me know which version of the cassandra connector I should use. I am using below shaded connector but i AS> think that causing the issue AS> "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2" AS> Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter AS> at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) AS> at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) AS> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) AS> Thanks AS> AS> Amit -- With best wishes,Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Unable to use WriteStream to write to delta file.
.$anonfun$runBatch$17(MicroBatchExecution.scala:600) >> >> at >> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) >> >> at >> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) >> >> at >> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) >> >> at >> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) >> >> at >> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) >> >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598) >> >> at >> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) >> >> at >> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) >> >> at >> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) >> >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598) >> >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228) >> >> at >> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >> >> at >> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) >> >> at >> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) >> >> at >> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)Traceback >> (most recent call last): >> >> File >> "C:\Users\agundapaneni\Development\ModernDataEstate\tests\test_mdefbasic.py", >> line 60, in >> >> >> >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193) >> >> at >> org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) >> >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187) >> >> at >> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303) >> >> at >> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >> >> at >> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) >> >> at org.apache.spark.sql.execution.streaming.StreamExecution.org >> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) >> >> at >> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209) >> >> obj.test_ingest_incremental_data_batch1() >> >> File >> "C:\Users\agundapaneni\Development\ModernDataEstate\tests\test_mdefbasic.py", >> line 56, in test_ingest_incremental_data_batch1 >> >> mdef.ingest_incremental_data('example', entity, >> self.schemas['studentattendance'], 'school_year') >> >> File >> "C:\Users\agundapaneni\Development\ModernDataEstate/src\MDEFBasic.py", line >> 109, in ingest_incremental_data >> >> query.awaitTermination() # block until query is terminated, with >> stop() or with error; A StreamingQueryException will be thrown if an >> exception occurs. >> >> File >> "C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\pyspark\sql\streaming.py", >> line 101, in awaitTermination >> >> return self._jsq.awaitTermination() >> >> File >> "C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\py4j\java_gateway.py", >> line 1309, in __call__ >> >> return_value = get_return_value( >> >> File >> "C:\Users\agundapaneni\Development\ModernDataEstate\.tox\default\lib\site-packages\pyspark\sql\utils.py", >> line 117, in deco >> >> raise converted from None >> >> pyspark.sql.utils.StreamingQueryException: >> org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(Lscala/collection/Seq;)V >> >> === Streaming Query === >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> > -- With best wishes,Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian)
Re: Spark Structured Streaming Continuous Trigger on multiple sinks
Just don't call .awaitTermindation() because it blocks execution of the next line of code. You can assign result of .start() to a specific variable, or put them into list/array. And to wait until one of the streams finishes, use spark.streams.awaitAnyTermination() or something like this (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries) S at "Wed, 25 Aug 2021 14:14:48 +0530" wrote: S> Hello, S> I have a structured streaming job that needs to be able to write to multiple sinks. We are using Continuous Trigger and not Microbatch Trigger. S> 1. When we use the foreach method using: S> dataset1.writeStream.foreach(kafka ForEachWriter logic).trigger(ContinuousMode).start().awaitTermination() S> dataset1.writeStream.foreach(mongo ForEachWriter logic).trigger(ContinuousMode).start().awaitTermination() S> The first statement blocks the second one for obvious reasons. So this does not serve our purpose. S> 2. The next step for this problem would be to use the foreachbatch. That is not supported in the ContinuousMode. S> 3. The next step was to use something like this S> dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination() S> dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination() S> for both the sinks. This does not work either. Only the 1st query works. The second one does not. S> Is there any solution to the problem of being able to write to multiple sinks in Continuous Trigger Mode using Structured Streaming? -- With best wishes,Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Cassandra raw deletion
Yes, you can do it using the RDD API of Spark Cassandra Connector: https://github.com/datastax/spark-cassandra-connector/blob/b2.5/doc/5_saving.md#deleting-rows-and-columns Depending on you if you're deleting only specific columns, or full rows, it's recommended to look to the keyColumns parameter - deletion by primary or partition key is the most effective way in Cassandra... Amit Sharma at "Sat, 4 Jul 2020 10:44:00 -0400" wrote: AS> Hi, I have to delete certain raw from Cassandra during my spark batch process. Is there any way to delete Rawat using spark Cassandra AS> connector. AS> Thanks AS> Amit -- With best wishes,Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Kafka Zeppelin integration
Can you post what settings have you configured for Spark interpreter? I recently did a demo of using Zeppelin 0.9.0 preview1 + Structured Streaming + Kafka, running in distributed mode on the DSE Analytics, and everything just worked... P.S. Here is the notebook if you're interested https://github.com/alexott/zeppelin-demos/blob/master/cassandra-day-russia/Cassandra%20Day%20Russia%20Streaming%20demo.zpln silav...@dtechspace.com at "Fri, 19 Jun 2020 19:41:45 -0700" wrote: s> hi here is my question. Spark code run on zeppelin is unable to find kafka source even s> though a dependency is specified. I ask is there any way to fix this. Zeppelin version is s> 0.9.0, Spark version is 2.4.6, and kafka version is 2.4.1. I have specified the dependency s> in the packages and add a jar file that contained the kafka stream 010. -- With best wishes, Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark structured streaming - performance tuning
Just to clarify - I didn't write this explicitly in my answer. When you're working with Kafka, every partition in Kafka is mapped into Spark partition. And in Spark, every partition is mapped into task. But you can use `coalesce` to decrease the number of Spark partitions, so you'll have less tasks... Srinivas V at "Sat, 18 Apr 2020 10:32:33 +0530" wrote: SV> Thank you Alex. I will check it out and let you know if I have any questions SV> On Fri, Apr 17, 2020 at 11:36 PM Alex Ott wrote: SV> http://shop.oreilly.com/product/0636920047568.do has quite good information SV> on it. For Kafka, you need to start with approximation that processing of SV> each partition is a separate task that need to be executed, so you need to SV> plan number of cores correspondingly. SV> SV> Srinivas V at "Thu, 16 Apr 2020 22:49:15 +0530" wrote: SV> SV> Hello, SV> SV> Can someone point me to a good video or document which takes about performance tuning for structured streaming app? SV> SV> I am looking especially for listening to Kafka topics say 5 topics each with 100 portions . SV> SV> Trying to figure out best cluster size and number of executors and cores required. -- With best wishes,Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark structured streaming - performance tuning
http://shop.oreilly.com/product/0636920047568.do has quite good information on it. For Kafka, you need to start with approximation that processing of each partition is a separate task that need to be executed, so you need to plan number of cores correspondingly. Srinivas V at "Thu, 16 Apr 2020 22:49:15 +0530" wrote: SV> Hello, SV> Can someone point me to a good video or document which takes about performance tuning for structured streaming app? SV> I am looking especially for listening to Kafka topics say 5 topics each with 100 portions . SV> Trying to figure out best cluster size and number of executors and cores required. -- With best wishes, Alex Ott http://alexott.net/ Twitter: alexott_en (English), alexott (Russian) - To unsubscribe e-mail: user-unsubscr...@spark.apache.org