Re: Structured Streaming to Kafka Topic
Hi Pankaj, What version of Spark are you using? If you are using 2.4+ then there is an inbuilt function "to_json" which converts the columns of your dataset to JSON format. https://spark.apache.org/docs/2.4.0/api/sql/#to_json Akshay Bhardwaj +91-97111-33849 On Wed, Mar 6, 2019 at 10:29 PM Pankaj Wahane wrote: > Hi, > > I am using structured streaming for ETL. > > val data_stream = spark > .readStream // constantly expanding dataframe > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "sms_history") > .option("startingOffsets", "earliest") // begin from start of topic > .option("failOnDataLoss", "false") > .load() > > I transform this into a DataSet with following schema. > > root > |-- accountId: long (nullable = true) > |-- countryId: long (nullable = true) > |-- credits: double (nullable = true) > |-- deliveryStatus: string (nullable = true) > |-- senderId: string (nullable = true) > |-- sentStatus: string (nullable = true) > |-- source: integer (nullable = true) > |-- createdOn: timestamp (nullable = true) > |-- send_success_credits: double (nullable = true) > |-- send_error_credits: double (nullable = true) > |-- delivered_credits: double (nullable = true) > |-- invalid_sd_credits: double (nullable = true) > |-- undelivered_credits: double (nullable = true) > |-- unknown_credits: double (nullable = true) > > > Now I want to write this transformed stream to another Kafka topic. I have > temporarily used a UDF that accepts all these columns as parameters and > create a json string for adding a column "value" for writing to Kafka. > > Is there easier and cleaner way to do the same? > > > Thanks, > Pankaj > >
spark structured streaming crash due to decompressing gzip file failure
Hi, I have a structured streaming job which listens to a hdfs folder containing jsonl.gz files. The job crashed due to error: java.io.IOException: incorrect header check at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method) at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225) at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Is there a way to skip the gz files that cannot be decompressed? Exception handling seems not help. The only workaround I can think of is to decompress the gz files into another folder first and make the spark streaming job listen to this new folder. But this workaround may not be better compared with the solution using a unstructured streaming job to directly decompress the gz file, read jsonl file, validate the records and write the validated records into parquet. Any idea is highly appreciated!
PysPark date_add function suggestion
I've been looking at the source code of the PySpark date_add function (https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#date_add) and I'm wondering why the days input variable is not cast to a java column like the start variable. This effectively means that when working with data frames, you can only add one number of days to all of your dates. I think it would make more sense to cast the days variable to a java column, so that you could add different days to different dates. The jvm function date_add has no problem doing this because I can add a date and integer column using the expr function (expr("date_add(start, days)"). And if you wanted to add the same date, you could just make a lit column with the same number. This argument applies to the functions date_sub and add_months as well. Clay M Science archives and monitors outgoing and incoming e-mail. The contents of this email, including any attachments, are confidential to the ordinary user of the email address to which it was addressed. If you are not the addressee of this email you may not copy, forward, disclose or otherwise use it or any part of it in any form whatsoever. This email may be produced at the request of regulators or in connection with civil litigation. M Science accepts no liability for any errors or omissions arising as a result of transmission. Use by other than intended recipients is prohibited.
Structured Streaming to Kafka Topic
Hi, I am using structured streaming for ETL. val data_stream = spark .readStream // constantly expanding dataframe .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "sms_history") .option("startingOffsets", "earliest") // begin from start of topic .option("failOnDataLoss", "false") .load() I transform this into a DataSet with following schema. root |-- accountId: long (nullable = true) |-- countryId: long (nullable = true) |-- credits: double (nullable = true) |-- deliveryStatus: string (nullable = true) |-- senderId: string (nullable = true) |-- sentStatus: string (nullable = true) |-- source: integer (nullable = true) |-- createdOn: timestamp (nullable = true) |-- send_success_credits: double (nullable = true) |-- send_error_credits: double (nullable = true) |-- delivered_credits: double (nullable = true) |-- invalid_sd_credits: double (nullable = true) |-- undelivered_credits: double (nullable = true) |-- unknown_credits: double (nullable = true) Now I want to write this transformed stream to another Kafka topic. I have temporarily used a UDF that accepts all these columns as parameters and create a json string for adding a column "value" for writing to Kafka. Is there easier and cleaner way to do the same? Thanks, Pankaj
4 Apache Events in 2019: DC Roadshow soon; next up Chicago, Las Vegas, and Berlin!
Dear Apache Enthusiast, (You’re receiving this because you are subscribed to one or more user mailing lists for an Apache Software Foundation project.) TL;DR: * Apache Roadshow DC is in 3 weeks. Register now at https://apachecon.com/usroadshowdc19/ * Registration for Apache Roadshow Chicago is open. http://apachecon.com/chiroadshow19 * The CFP for ApacheCon North America is now open. https://apachecon.com/acna19 * Save the date: ApacheCon Europe will be held in Berlin, October 22nd through 24th. https://apachecon.com/aceu19 Registration is open for two Apache Roadshows; these are smaller events with a more focused program and regional community engagement: Our Roadshow event in Washington DC takes place in under three weeks, on March 25th. We’ll be hosting a day-long event at the Fairfax campus of George Mason University. The roadshow is a full day of technical talks (two tracks) and an open source job fair featuring AWS, Bloomberg, dito, GridGain, Linode, and Security University. More details about the program, the job fair, and to register, visit https://apachecon.com/usroadshowdc19/ Apache Roadshow Chicago will be held May 13-14th at a number of venues in Chicago’s Logan Square neighborhood. This event will feature sessions in AdTech, FinTech and Insurance, startups, “Made in Chicago”, Project Shark Tank (innovations from the Apache Incubator), community diversity, and more. It’s a great way to learn about various Apache projects “at work” while playing at a brewery, a beercade, and a neighborhood bar. Sign up today at https://www.apachecon.com/chiroadshow19/ We’re delighted to announce that the Call for Presentations (CFP) is now open for ApacheCon North America in Las Vegas, September 9-13th! As the official conference series of the ASF, ApacheCon North America will feature over a dozen Apache project summits, including Cassandra, Cloudstack, Tomcat, Traffic Control, and more. We’re looking for talks in a wide variety of categories -- anything related to ASF projects and the Apache development process. The CFP closes at midnight on May 26th. In addition, the ASF will be celebrating its 20th Anniversary during the event. For more details and to submit a proposal for the CFP, visit https://apachecon.com/acna19/ . Registration will be opening soon. Be sure to mark your calendars for ApacheCon Europe, which will be held in Berlin, October 22-24th at the KulturBrauerei, a landmark of Berlin's industrial history. In addition to innovative content from our projects, we are collaborating with the Open Source Design community (https://opensourcedesign.net/) to offer a track on design this year. The CFP and registration will open soon at https://apachecon.com/aceu19/ . Sponsorship opportunities are available for all events, with details listed on each event’s site at http://apachecon.com/. We look forward to seeing you! Rich, for the ApacheCon Planners @apachecon - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Why does Apache Spark Master shutdown when Zookeeper expires the session
I guess this is known issue and being tracked by SPARK-15544 [1] SPARK-23530 [2] (seems duplicated) I guess that's the simplest implementation of H/A (since we don't bother with current state in master) when background process like supervisord restarts the process when process is no longer running, but if there's no background process being setup, it may lead to become all master processes being shut down eventually. IMHO the safer approach is storing all information to ZK (source of truth) and only leader master can read and write on that. Other follower masters just wait and load information when one of them becomes master. That should require pretty much changes though. Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) 1. https://issues.apache.org/jira/browse/SPARK-15544 2. https://issues.apache.org/jira/browse/SPARK-23530 2019년 3월 5일 (화) 오후 10:02, lokeshkumar 님이 작성: > As I understand, Apache Spark Master can be run in high availability mode > using Zookeeper. That is, multiple Spark masters can run in Leader/Follower > mode and these modes are registered with Zookeeper. > > In our scenario Zookeeper is expiring the Spark Master's session which is > acting as Leader. So the Spark MAster which is leader receives this > notification and shutsdown deliberately. > > Can someone explain why this decision os shutting down rather than retrying > has been taken? > > And why does Kafka retry connecting to Zookeeper when it receives the same > Expiry notification? > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: "java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error
Hi The max bytes setting should be enough, because if the tasks fail, it read the data from kafka very fast as normal. The request.timeout.ms I set is 180 seconds. I think it should be time out setting or max bandwidth setting because of the reason that it recoveries and read the same partition very fast after the tasks are marked failed. Regard, Junfeng Chen On Wed, Mar 6, 2019 at 4:01 PM Akshay Bhardwaj < akshay.bhardwaj1...@gmail.com> wrote: > Sorry message sent as incomplete. > > To better debug the issue, please check the below config properties: > >- At Kafka consumer properties > - max.partition.fetch.bytes within spark kafka consumer. If not set > for consumer then the global config at broker level. > - request.timeout.ms >- At spark's configurations > - spark.streaming.kafka.consumer.poll.ms > - spark.network.timeout (If the above is not set, then poll.ms is > default to spark.network.timeout) > > > Generally I have faced this issue if spark.streaming.kafka. > consumer.poll.ms is less than request.timeout.ms > > Also, what is the average kafka record message size in bytes? > > > > Akshay Bhardwaj > +91-97111-33849 > > > On Wed, Mar 6, 2019 at 1:26 PM Akshay Bhardwaj < > akshay.bhardwaj1...@gmail.com> wrote: > >> Hi, >> >> To better debug the issue, please check the below config properties: >> >>- max.partition.fetch.bytes within spark kafka consumer. If not set >>for consumer then the global config at broker level. >>- spark.streaming.kafka.consumer.poll.ms >> - spark.network.timeout (If the above is not set, then poll.ms is >> default to spark.network.timeout) >>- >>- >> >> Akshay Bhardwaj >> +91-97111-33849 >> >> >> On Wed, Mar 6, 2019 at 8:39 AM JF Chen wrote: >> >>> When my kafka executor reads data from kafka, sometimes it throws the >>> error "java.lang.AssertionError: assertion failed: Failed to get records >>> for after polling for 18" , which after 3 minutes of executing. >>> The data waiting for read is not so huge, which is about 1GB. And other >>> partitions read by other tasks are very fast, the error always occurs on >>> some specific executor.. >>> >>> Regard, >>> Junfeng Chen >>> >>
Re: "java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error
Sorry message sent as incomplete. To better debug the issue, please check the below config properties: - At Kafka consumer properties - max.partition.fetch.bytes within spark kafka consumer. If not set for consumer then the global config at broker level. - request.timeout.ms - At spark's configurations - spark.streaming.kafka.consumer.poll.ms - spark.network.timeout (If the above is not set, then poll.ms is default to spark.network.timeout) Generally I have faced this issue if spark.streaming.kafka.consumer.poll.ms is less than request.timeout.ms Also, what is the average kafka record message size in bytes? Akshay Bhardwaj +91-97111-33849 On Wed, Mar 6, 2019 at 1:26 PM Akshay Bhardwaj < akshay.bhardwaj1...@gmail.com> wrote: > Hi, > > To better debug the issue, please check the below config properties: > >- max.partition.fetch.bytes within spark kafka consumer. If not set >for consumer then the global config at broker level. >- spark.streaming.kafka.consumer.poll.ms > - spark.network.timeout (If the above is not set, then poll.ms is > default to spark.network.timeout) >- >- > > Akshay Bhardwaj > +91-97111-33849 > > > On Wed, Mar 6, 2019 at 8:39 AM JF Chen wrote: > >> When my kafka executor reads data from kafka, sometimes it throws the >> error "java.lang.AssertionError: assertion failed: Failed to get records >> for after polling for 18" , which after 3 minutes of executing. >> The data waiting for read is not so huge, which is about 1GB. And other >> partitions read by other tasks are very fast, the error always occurs on >> some specific executor.. >> >> Regard, >> Junfeng Chen >> >