Spark Structured streaming 2.4 - Kill and deploy in yarn

2020-08-10 Thread KhajaAsmath Mohammed
Hi ,

I am looking for some information on how to gracefully kill the spark
structured streaming kafka job and redeploy it.

How to kill a spark structured job in YARN?
any suggestions on how to kill gracefully?

I was able to monitor the job from SQL tab but how can I kill this job when
deployed in YARN without knowing yarn id?

Thanks,
Asmath


Re: regexp_extract regex for extracting the columns from string

2020-08-10 Thread Enrico Minack
You can remove the <1000> first and then turn the string into a map 
(interpret the string as key-values). From that map you can access each 
key and turn it into a separate column:


Seq(("<1000> date=2020-08-01 time=20:50:04 name=processing id=123 
session=new packt=20 orgin=null address=null dest=fgjglgl"))

  .toDF("string")
  .withColumn("key-values", regexp_replace($"string", "^[^ ]+ ", ""))
  .withColumn("map", expr("str_to_map(`key-values`, ' ', '=')"))
  .select(
    $"map"("date").as("date"),
    $"map"("time").as("time"),
    $"map"("name").as("name"),
    $"map"("id").as("id"),
    $"map"("session").as("session"),
    $"map"("packt").as("packt"),
    $"map"("origin").as("origin"),
    $"map"("address").as("address"),
    $"map"("dest").as("dest")
  )
  .show(false)

Enrico


Am 09.08.20 um 18:00 schrieb anbutech:

Hi All,

I have a following info.in the data column.

<1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new
packt=20 orgin=null address=null dest=fgjglgl

here I want to create a separate column for the above key value pairs after
the integer <1000> separated by spaces.
Is there any way to achieved it using regexp_extract inbuilt functions.i
don't want to do it using udf function.
apart from udf,is there any way to achieved it.


Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: regexp_extract regex for extracting the columns from string

2020-08-10 Thread Patrick McCarthy
Can you simply do a string split on space, and then another on '='?

On Sun, Aug 9, 2020 at 12:00 PM anbutech  wrote:

> Hi All,
>
> I have a following info.in the data column.
>
> <1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new
> packt=20 orgin=null address=null dest=fgjglgl
>
> here I want to create a separate column for the above key value pairs after
> the integer <1000> separated by spaces.
> Is there any way to achieved it using regexp_extract inbuilt functions.i
> don't want to do it using udf function.
> apart from udf,is there any way to achieved it.
>
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Streaming AVRO data in console: java.lang.ArrayIndexOutOfBoundsException

2020-08-10 Thread dwgw


Hi
I am trying to stream Kafka topic (in AVRO format) in the console and for
that i have loaded the avro data from kafka topic in the data-frame but when
try to stream in the console i am getting following error.

*scala>* val records = spark.
   readStream.
   format("kafka").
   option("kafka.bootstrap.servers", "broker1:9093").
   option("subscribe", "PERSON").  
   option("startingOffsets", "earliest").
   load()

records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
more fields]

*scala>* val jsonFormatSchema = new
String(Files.readAllBytes(Paths.get("/home/spark/person.avsc")))

jsonFormatSchema: String =
"{
  "type": "record",
  "name": "Person",
  "namespace": "io.confluent.connect.avro",
  "fields": [
...
...

*scala>* val output =
records.select(from_avro(col("value"),jsonFormatSchema).as("person"))
output: org.apache.spark.sql.DataFrame = [person: struct]

*scala>*  .select("icxsession.*")

res15: org.apache.spark.sql.DataFrame = [SESSION_ID: bigint,
VERSION_STARTSCN: bigint ... 46 more fields]

*Error occurs here:*

*scala>* output.writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

20/08/10 01:14:24 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0
(TID 20, workstation.com, executor 2):
*java.lang.ArrayIndexOutOfBoundsException: 1405994075*
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at
org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

[Stage 5:>  (0 + 1)
/ 1]20/08/10 01:14:25 ERROR scheduler.TaskSetManager: Task 0 in stage 5.0
failed 4 times; aborting job
20/08/10 01:14:25 ERROR v2.WriteToDataSourceV2Exec: Data source writer
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@488b8521
is aborting.
20/08/10 01:14:25 ERROR v2.WriteToDataSourceV2Exec: Data source writer
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@488b8521
aborted.
*20/08/10 01:14:25 ERROR streaming.MicroBatchExecution: Query [id =
5e8ffd55-fb54-45d1-8255-56ba810c1f51, runId =
1b7245de-de96-43e7-98ef-8bc62a6f697e] terminated with error
org.apache.spark.SparkException: Writing job aborted.*
at

Re: Spark streaming receivers

2020-08-10 Thread Russell Spitzer
The direct approach, which is also available through dstreams, and
structured streaming use a different model. Instead of being a push based
streaming solution they instead are pull based. (In general)

On every batch the driver uses the configuration to create a number of
partitions, each is responsible for independently pulling a number of
records. The exact number of records and guarantees around the pull are
source and configuration dependent. Since the system is pull based, there
is no need for a receiver or block management system taking up resources.
Every task/partition contains all the information required to get the data
that it describes.

An example in Kafka, the driver might decide that batch 1 contains all the
records between offset 1 and 100. It checks and sees that there are 10
Kafka partitions. So it ends up making a spark job which contains 10 tasks
each task dedicated to a single Kafka partition. Each task will then
independently ask for 100 records from it's Kafka partition. There will be
no Spark resources used outside of those required for those 10 tasks.

On Sun, Aug 9, 2020, 10:44 PM Dark Crusader 
wrote:

> Hi Russell,
> This is super helpful. Thank you so much.
>
> Can you elaborate on the differences between structured streaming vs
> dstreams? How would the number of receivers required etc change?
>
> On Sat, 8 Aug, 2020, 10:28 pm Russell Spitzer, 
> wrote:
>
>> Note, none of this applies to Direct streaming approaches, only receiver
>> based Dstreams.
>>
>> You can think of a receiver as a long running task that never finishes.
>> Each receiver is submitted to an executor slot somewhere, it then runs
>> indefinitely and internally has a method which passes records over to a
>> block management system. There is a timing that you set which decides when
>> each block is "done" and records after that time has passed go into the
>> next block (See parameter
>> 
>>  spark.streaming.blockInterval)  Once a block is done it can be
>> processed in the next Spark batch.. The gap between a block starting and a
>> block being finished is why you can lose data in Receiver streaming without
>> WriteAheadLoging. Usually your block interval is divisible into your batch
>> interval so you'll get X blocks per batch. Each block becomes one partition
>> of the job being done in a Streaming batch. Multiple receivers can be
>> unified into a single dstream, which just means the blocks produced by all
>> of those receivers are handled in the same Streaming batch.
>>
>> So if you have 5 different receivers, you need at minimum 6 executor
>> cores. 1 core for each receiver, and 1 core to actually do your processing
>> work. In a real world case you probably want significantly more  cores on
>> the processing side than just 1. Without repartitioning you will never have
>> more that
>>
>> A quick example
>>
>> I run 5 receivers with block interval of 100ms and spark batch interval
>> of 1 second. I use union to group them all together, I will most likely end
>> up with one Spark Job for each batch every second running with 50
>> partitions (1000ms / 100(ms / partition / receiver) * 5 receivers). If I
>> have a total of 10 cores in the system. 5 of them are running receivers,
>> The remaining 5 must process the 50 partitions of data generated by the
>> last second of work.
>>
>> And again, just to reiterate, if you are doing a direct streaming
>> approach or structured streaming, none of this applies.
>>
>> On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm having some trouble figuring out how receivers tie into spark
>>> driver-executor structure.
>>> Do all executors have a receiver that is blocked as soon as it
>>> receives some stream data?
>>> Or can multiple streams of data be taken as input into a single executor?
>>>
>>> I have stream data coming in at every second coming from 5 different
>>> sources. I want to aggregate data from each of them. Does this mean I need
>>> 5 executors or does it have to do with threads on the executor?
>>>
>>> I might be mixing in a few concepts here. Any help would be appreciated.
>>> Thank you.
>>>
>>