Re: Structured Streaming to Kafka Topic

2019-03-06 Thread Akshay Bhardwaj
Hi Pankaj,

What version of Spark are you using?

If you are using 2.4+ then there is an inbuilt function "to_json" which
converts the columns of your dataset to JSON format.
https://spark.apache.org/docs/2.4.0/api/sql/#to_json

Akshay Bhardwaj
+91-97111-33849


On Wed, Mar 6, 2019 at 10:29 PM Pankaj Wahane  wrote:

> Hi,
>
> I am using structured streaming for ETL.
>
> val data_stream = spark
>   .readStream // constantly expanding dataframe
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "sms_history")
>   .option("startingOffsets", "earliest") // begin from start of topic
>   .option("failOnDataLoss", "false")
>   .load()
>
> I transform this into a DataSet with following schema.
>
> root
>  |-- accountId: long (nullable = true)
>  |-- countryId: long (nullable = true)
>  |-- credits: double (nullable = true)
>  |-- deliveryStatus: string (nullable = true)
>  |-- senderId: string (nullable = true)
>  |-- sentStatus: string (nullable = true)
>  |-- source: integer (nullable = true)
>  |-- createdOn: timestamp (nullable = true)
>  |-- send_success_credits: double (nullable = true)
>  |-- send_error_credits: double (nullable = true)
>  |-- delivered_credits: double (nullable = true)
>  |-- invalid_sd_credits: double (nullable = true)
>  |-- undelivered_credits: double (nullable = true)
>  |-- unknown_credits: double (nullable = true)
>
>
> Now I want to write this transformed stream to another Kafka topic. I have
> temporarily used a UDF that accepts all these columns as parameters and
> create a json string for adding a column "value" for writing to Kafka.
>
> Is there easier and cleaner way to do the same?
>
>
> Thanks,
> Pankaj
>
>


spark structured streaming crash due to decompressing gzip file failure

2019-03-06 Thread Lian Jiang
Hi,

I have a structured streaming job which listens to a hdfs folder containing
jsonl.gz files. The job crashed due to error:

java.io.IOException: incorrect header check
at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
Method)
at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
at
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Is there a way to skip the gz files that cannot be decompressed? Exception
handling seems not help. The only workaround I can think of is to
decompress the gz files into another folder first and make the spark
streaming job listen to this new folder. But this workaround may not be
better compared with the solution using a unstructured streaming job to
directly decompress the gz file, read jsonl file, validate the records and
write the validated records into parquet.

Any idea is highly appreciated!


PysPark date_add function suggestion

2019-03-06 Thread William Creger
I've been looking at the source code of the PySpark date_add function 
(https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#date_add)
 and I'm wondering why the days input variable is not cast to a java column 
like the start variable. This effectively means that when working with data 
frames, you can only add one number of days to all of your dates. I think it 
would make more sense to cast the days variable to a java column, so that you 
could add different days to different dates. The jvm function date_add has no 
problem doing this because I can add a date and integer column using the expr 
function (expr("date_add(start, days)"). And if you wanted to add the same 
date, you could just make a lit column with the same number. This argument 
applies to the functions date_sub and add_months as well.

Clay



M Science archives and monitors outgoing and incoming e-mail. The contents of 
this email, including any attachments, are confidential to the ordinary user of 
the email address to which it was addressed. If you are not the addressee of 
this email you may not copy, forward, disclose or otherwise use it or any part 
of it in any form whatsoever. This email may be produced at the request of 
regulators or in connection with civil litigation. M Science accepts no 
liability for any errors or omissions arising as a result of transmission. Use 
by other than intended recipients is prohibited.


Structured Streaming to Kafka Topic

2019-03-06 Thread Pankaj Wahane
Hi,

I am using structured streaming for ETL.


val data_stream = spark
  .readStream // constantly expanding dataframe
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "sms_history")
  .option("startingOffsets", "earliest") // begin from start of topic
  .option("failOnDataLoss", "false")
  .load()

I transform this into a DataSet with following schema.

root
 |-- accountId: long (nullable = true)
 |-- countryId: long (nullable = true)
 |-- credits: double (nullable = true)
 |-- deliveryStatus: string (nullable = true)
 |-- senderId: string (nullable = true)
 |-- sentStatus: string (nullable = true)
 |-- source: integer (nullable = true)
 |-- createdOn: timestamp (nullable = true)
 |-- send_success_credits: double (nullable = true)
 |-- send_error_credits: double (nullable = true)
 |-- delivered_credits: double (nullable = true)
 |-- invalid_sd_credits: double (nullable = true)
 |-- undelivered_credits: double (nullable = true)
 |-- unknown_credits: double (nullable = true)


Now I want to write this transformed stream to another Kafka topic. I have 
temporarily used a UDF that accepts all these columns as parameters and create 
a json string for adding a column "value" for writing to Kafka.

Is there easier and cleaner way to do the same?


Thanks,
Pankaj



4 Apache Events in 2019: DC Roadshow soon; next up Chicago, Las Vegas, and Berlin!

2019-03-06 Thread Rich Bowen
Dear Apache Enthusiast,

(You’re receiving this because you are subscribed to one or more user
mailing lists for an Apache Software Foundation project.)

TL;DR:
 * Apache Roadshow DC is in 3 weeks. Register now at
https://apachecon.com/usroadshowdc19/
 * Registration for Apache Roadshow Chicago is open.
http://apachecon.com/chiroadshow19
 * The CFP for ApacheCon North America is now open.
https://apachecon.com/acna19
 * Save the date: ApacheCon Europe will be held in Berlin, October 22nd
through 24th.  https://apachecon.com/aceu19


Registration is open for two Apache Roadshows; these are smaller events
with a more focused program and regional community engagement:

Our Roadshow event in Washington DC takes place in under three weeks, on
March 25th. We’ll be hosting a day-long event at the Fairfax campus of
George Mason University. The roadshow is a full day of technical talks
(two tracks) and an open source job fair featuring AWS, Bloomberg, dito,
GridGain, Linode, and Security University. More details about the
program, the job fair, and to register, visit
https://apachecon.com/usroadshowdc19/

Apache Roadshow Chicago will be held May 13-14th at a number of venues
in Chicago’s Logan Square neighborhood. This event will feature sessions
in AdTech, FinTech and Insurance, startups, “Made in Chicago”, Project
Shark Tank (innovations from the Apache Incubator), community diversity,
and more. It’s a great way to learn about various Apache projects “at
work” while playing at a brewery, a beercade, and a neighborhood bar.
Sign up today at https://www.apachecon.com/chiroadshow19/

We’re delighted to announce that the Call for Presentations (CFP) is now
open for ApacheCon North America in Las Vegas, September 9-13th! As the
official conference series of the ASF, ApacheCon North America will
feature over a dozen Apache project summits, including Cassandra,
Cloudstack, Tomcat, Traffic Control, and more. We’re looking for talks
in a wide variety of categories -- anything related to ASF projects and
the Apache development process. The CFP closes at midnight on May 26th.
In addition, the ASF will be celebrating its 20th Anniversary during the
event. For more details and to submit a proposal for the CFP, visit
https://apachecon.com/acna19/ . Registration will be opening soon.

Be sure to mark your calendars for ApacheCon Europe, which will be held
in Berlin, October 22-24th at the KulturBrauerei, a landmark of Berlin's
industrial history. In addition to innovative content from our projects,
we are collaborating with the Open Source Design community
(https://opensourcedesign.net/) to offer a track on design this year.
The CFP and registration will open soon at https://apachecon.com/aceu19/ .

Sponsorship opportunities are available for all events, with details
listed on each event’s site at http://apachecon.com/.

We look forward to seeing you!

Rich, for the ApacheCon Planners
@apachecon


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why does Apache Spark Master shutdown when Zookeeper expires the session

2019-03-06 Thread Jungtaek Lim
I guess this is known issue and being tracked by SPARK-15544
[1] SPARK-23530 [2] (seems duplicated)

I guess that's the simplest implementation of H/A (since we don't bother
with current state in master) when background process like supervisord
restarts the process when process is no longer running, but if there's no
background process being setup, it may lead to become all master processes
being shut down eventually.

IMHO the safer approach is storing all information to ZK (source of truth)
and only leader master can read and write on that. Other follower masters
just wait and load information when one of them becomes master. That should
require pretty much changes though.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-15544
2. https://issues.apache.org/jira/browse/SPARK-23530

2019년 3월 5일 (화) 오후 10:02, lokeshkumar 님이 작성:

> As I understand, Apache Spark Master can be run in high availability mode
> using Zookeeper. That is, multiple Spark masters can run in Leader/Follower
> mode and these modes are registered with Zookeeper.
>
> In our scenario Zookeeper is expiring the Spark Master's session which is
> acting as Leader. So the Spark MAster which is leader receives this
> notification and shutsdown deliberately.
>
> Can someone explain why this decision os shutting down rather than retrying
> has been taken?
>
> And why does Kafka retry connecting to Zookeeper when it receives the same
> Expiry notification?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: "java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error

2019-03-06 Thread JF Chen
Hi
The max bytes setting should be enough, because if the tasks fail, it read
the data from kafka very fast as normal.
The   request.timeout.ms  I set is 180 seconds.
I think it should be time out setting or max  bandwidth setting because of
the reason that it recoveries and read the same partition very fast after
the tasks are marked failed.

Regard,
Junfeng Chen


On Wed, Mar 6, 2019 at 4:01 PM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Sorry message sent as incomplete.
>
> To better debug the issue, please check the below config properties:
>
>- At Kafka consumer properties
>   - max.partition.fetch.bytes within spark kafka consumer. If not set
>   for consumer then the global config at broker level.
>   - request.timeout.ms
>- At spark's configurations
>   - spark.streaming.kafka.consumer.poll.ms
>   - spark.network.timeout (If the above is not set, then poll.ms is
>   default to spark.network.timeout)
>
>
> Generally I have faced this issue if spark.streaming.kafka.
> consumer.poll.ms is less than request.timeout.ms
>
> Also, what is the average kafka record message size in bytes?
>
>
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Wed, Mar 6, 2019 at 1:26 PM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi,
>>
>> To better debug the issue, please check the below config properties:
>>
>>- max.partition.fetch.bytes within spark kafka consumer. If not set
>>for consumer then the global config at broker level.
>>- spark.streaming.kafka.consumer.poll.ms
>>   - spark.network.timeout (If the above is not set, then poll.ms is
>>   default to spark.network.timeout)
>>-
>>-
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Wed, Mar 6, 2019 at 8:39 AM JF Chen  wrote:
>>
>>> When my kafka executor reads data from kafka, sometimes it throws the
>>> error "java.lang.AssertionError: assertion failed: Failed to get records
>>> for  after polling for 18" , which after 3 minutes of executing.
>>> The data waiting for read is not so huge, which is about 1GB. And other
>>> partitions read by other tasks are very fast, the error always occurs on
>>> some specific executor..
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>


Re: "java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error

2019-03-06 Thread Akshay Bhardwaj
Sorry message sent as incomplete.

To better debug the issue, please check the below config properties:

   - At Kafka consumer properties
  - max.partition.fetch.bytes within spark kafka consumer. If not set
  for consumer then the global config at broker level.
  - request.timeout.ms
   - At spark's configurations
  - spark.streaming.kafka.consumer.poll.ms
  - spark.network.timeout (If the above is not set, then poll.ms is
  default to spark.network.timeout)


Generally I have faced this issue if spark.streaming.kafka.consumer.poll.ms is
less than request.timeout.ms

Also, what is the average kafka record message size in bytes?



Akshay Bhardwaj
+91-97111-33849


On Wed, Mar 6, 2019 at 1:26 PM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi,
>
> To better debug the issue, please check the below config properties:
>
>- max.partition.fetch.bytes within spark kafka consumer. If not set
>for consumer then the global config at broker level.
>- spark.streaming.kafka.consumer.poll.ms
>   - spark.network.timeout (If the above is not set, then poll.ms is
>   default to spark.network.timeout)
>-
>-
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Wed, Mar 6, 2019 at 8:39 AM JF Chen  wrote:
>
>> When my kafka executor reads data from kafka, sometimes it throws the
>> error "java.lang.AssertionError: assertion failed: Failed to get records
>> for  after polling for 18" , which after 3 minutes of executing.
>> The data waiting for read is not so huge, which is about 1GB. And other
>> partitions read by other tasks are very fast, the error always occurs on
>> some specific executor..
>>
>> Regard,
>> Junfeng Chen
>>
>