Re: Using Spark Accumulators with Structured Streaming

2020-06-04 Thread ZHANG Wei
The following Java codes can work in my cluster environment:
```
.mapGroupsWithState((MapGroupsWithStateFunction) (key, values, state) -> {
myAcc.add(1);
<...>
state.update(newState);
return new LeadingCharCount(key, newState);
},
Encoders.LONG(),
Encoders.bean(LeadingCharCount.class),
GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
.mapGroupsWithState(
new StateUpdateTask(myAcc),
Encoders.LONG(),
Encoders.bean(LeadingCharCount.class),
GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
implements MapGroupsWithStateFunction {
private LongAccumulator myAccInTask;

public StateUpdateTask(LongAccumulator acc) {
this.myAccInTask = acc;
}

@Override
public LeadingCharCount call(String key, Iterator values, 
GroupState state) throws Exception {
myAccInTask.add(1);
<...>
state.update(newState);
return new LeadingCharCount(key, newState);
}
}
```

-- 
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei  wrote:

> Yes, verified on the cluster with 5 executors.
> 
> -- 
> Cheers,
> -z
> 
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something  wrote:
> 
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> > 
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei  wrote:
> > 
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > > spark.streams.addListener(new StreamingQueryListener {
> > >   override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > > println(event.progress.id + " is on progress")
> > > println(s"My accu is ${myAcc.value} on query progress")
> > >   }
> > > ...
> > > })
> > >
> > > def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >   myAcc.add(1)
> > >   println(s">>> key: $key => state: ${state}")
> > > ...
> > > }
> > >
> > > val wordCounts = words
> > >   .groupByKey(v => ...)
> > >   .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > > val query = wordCounts.writeStream
> > >   .outputMode(OutputMode.Update)
> > > ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > > .mapGroupsWithState(
> > > > new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > > Encoders.bean(ModelStateInfo.class),
> > > > Encoders.bean(ModelUpdate.class),
> > > > GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V  wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > > @Override
> > > > public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > > logger.info("Query started: " + queryStarted.id());
> > > > }
> > > > @Override
> > > > public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > > logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > > }
> > > > @Override
> > > > public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >

Re: Using Spark Accumulators with Structured Streaming

2020-06-01 Thread ZHANG Wei
Yes, verified on the cluster with 5 executors.

-- 
Cheers,
-z

On Fri, 29 May 2020 11:16:12 -0700
Something Something  wrote:

> Did you try this on the Cluster? Note: This works just fine under 'Local'
> mode.
> 
> On Thu, May 28, 2020 at 9:12 PM ZHANG Wei  wrote:
> 
> > I can't reproduce the issue with my simple code:
> > ```scala
> > spark.streams.addListener(new StreamingQueryListener {
> >   override def onQueryProgress(event:
> > StreamingQueryListener.QueryProgressEvent): Unit = {
> > println(event.progress.id + " is on progress")
> > println(s"My accu is ${myAcc.value} on query progress")
> >   }
> > ...
> > })
> >
> > def mappingFunc(key: Long, values: Iterator[String], state:
> > GroupState[Long]): ... = {
> >   myAcc.add(1)
> >   println(s">>> key: $key => state: ${state}")
> > ...
> > }
> >
> > val wordCounts = words
> >   .groupByKey(v => ...)
> >   .mapGroupsWithState(timeoutConf =
> > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> >
> > val query = wordCounts.writeStream
> >   .outputMode(OutputMode.Update)
> > ...
> > ```
> >
> > I'm wondering if there were any errors can be found from driver logs? The
> > micro-batch
> > exceptions won't terminate the streaming job running.
> >
> > For the following code, we have to make sure that `StateUpdateTask` is
> > started:
> > > .mapGroupsWithState(
> > > new
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > appConfig, accumulators),
> > > Encoders.bean(ModelStateInfo.class),
> > > Encoders.bean(ModelUpdate.class),
> > > GroupStateTimeout.ProcessingTimeTimeout());
> >
> > --
> > Cheers,
> > -z
> >
> > On Thu, 28 May 2020 19:59:31 +0530
> > Srinivas V  wrote:
> >
> > > Giving the code below:
> > > //accumulators is a class level variable in driver.
> > >
> > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > @Override
> > > public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > logger.info("Query started: " + queryStarted.id());
> > > }
> > > @Override
> > > public void onQueryTerminated(QueryTerminatedEvent
> > > queryTerminated) {
> > > logger.info("Query terminated: " +
> > queryTerminated.id());
> > > }
> > > @Override
> > > public void onQueryProgress(QueryProgressEvent
> > queryProgress) {
> > >
> > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > long eventsReceived = 0;
> > > long eventsExpired = 0;
> > > long eventSentSuccess = 0;
> > > try {
> > > eventsReceived =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > eventsExpired =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > eventSentSuccess =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > } catch (MissingKeyException e) {
> > > logger.error("Accumulator key not found due to
> > > Exception {}", e.getMessage());
> > > }
> > >     logger.info("Events Received:{}", eventsReceived);
> > > logger.info("Events State Expired:{}", eventsExpired);
> > > logger.info("Events Sent Success:{}", eventSentSuccess);
> > > logger.info("Query made progress - batchId: {}
> > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > durationMs:{}" ,
> > > queryProgress.progress().batchId(),
> > > queryProgress.progress().numInputRows(),
> > > queryProgress.progress().inputRowsPerSecond(),
> > >
> >  queryProgress.progress().processedRowsPerSecond(),
> > > queryProgress.progress().durationMs());
> > >
> > >
> > > On Thu, May 28, 2020 at 7:04 PM ZHA

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
I can't reproduce the issue with my simple code:
```scala
spark.streams.addListener(new StreamingQueryListener {
  override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
println(event.progress.id + " is on progress")
println(s"My accu is ${myAcc.value} on query progress")
  }
...
})

def mappingFunc(key: Long, values: Iterator[String], state: 
GroupState[Long]): ... = {
  myAcc.add(1)
  println(s">>> key: $key => state: ${state}")
...
}

val wordCounts = words
  .groupByKey(v => ...)
  .mapGroupsWithState(timeoutConf = 
GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

val query = wordCounts.writeStream
  .outputMode(OutputMode.Update)
...
```

I'm wondering if there were any errors can be found from driver logs? The 
micro-batch
exceptions won't terminate the streaming job running.

For the following code, we have to make sure that `StateUpdateTask` is started:
> .mapGroupsWithState(
> new 
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
> Encoders.bean(ModelStateInfo.class),
> Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());

-- 
Cheers,
-z

On Thu, 28 May 2020 19:59:31 +0530
Srinivas V  wrote:

> Giving the code below:
> //accumulators is a class level variable in driver.
> 
>  sparkSession.streams().addListener(new StreamingQueryListener() {
> @Override
> public void onQueryStarted(QueryStartedEvent queryStarted) {
> logger.info("Query started: " + queryStarted.id());
> }
> @Override
> public void onQueryTerminated(QueryTerminatedEvent
> queryTerminated) {
> logger.info("Query terminated: " + queryTerminated.id());
> }
> @Override
> public void onQueryProgress(QueryProgressEvent queryProgress) {
> 
> accumulators.eventsReceived(queryProgress.progress().numInputRows());
> long eventsReceived = 0;
> long eventsExpired = 0;
> long eventSentSuccess = 0;
> try {
> eventsReceived =
> accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> eventsExpired =
> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> eventSentSuccess =
> accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> } catch (MissingKeyException e) {
> logger.error("Accumulator key not found due to
> Exception {}", e.getMessage());
> }
> logger.info("Events Received:{}", eventsReceived);
> logger.info("Events State Expired:{}", eventsExpired);
> logger.info("Events Sent Success:{}", eventSentSuccess);
> logger.info("Query made progress - batchId: {}
> numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> durationMs:{}" ,
> queryProgress.progress().batchId(),
> queryProgress.progress().numInputRows(),
> queryProgress.progress().inputRowsPerSecond(),
> queryProgress.progress().processedRowsPerSecond(),
> queryProgress.progress().durationMs());
> 
> 
> On Thu, May 28, 2020 at 7:04 PM ZHANG Wei  wrote:
> 
> > May I get how the accumulator is accessed in the method
> > `onQueryProgress()`?
> >
> > AFAICT, the accumulator is incremented well. There is a way to verify that
> > in cluster like this:
> > ```
> > // Add the following while loop before invoking awaitTermination
> > while (true) {
> >   println("My acc: " + myAcc.value)
> >   Thread.sleep(5 * 1000)
> > }
> >
> > //query.awaitTermination()
> > ```
> >
> > And the accumulator value updated can be found from driver stdout.
> >
> > --
> > Cheers,
> > -z
> >
> > On Thu, 28 May 2020 17:12:48 +0530
> > Srinivas V  wrote:
> >
> > > yes, I am using stateful structured streaming. Yes similar to what you
> > do.
> > > This is in Java
> > > I do it this way:
> > > Dataset productUpdates = watermarkedDS
> > > .groupByKey(
> > > (MapFunction) event ->
> > > event.getId(), Encoders.STRING())
> > > .mapG

Re: Using Spark Accumulators with Structured Streaming

2020-05-28 Thread ZHANG Wei
May I get how the accumulator is accessed in the method `onQueryProgress()`?

AFAICT, the accumulator is incremented well. There is a way to verify that
in cluster like this:
```
// Add the following while loop before invoking awaitTermination
while (true) {
  println("My acc: " + myAcc.value)
  Thread.sleep(5 * 1000)
}

//query.awaitTermination()
```

And the accumulator value updated can be found from driver stdout.

-- 
Cheers,
-z

On Thu, 28 May 2020 17:12:48 +0530
Srinivas V  wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
> Dataset productUpdates = watermarkedDS
> .groupByKey(
> (MapFunction) event ->
> event.getId(), Encoders.STRING())
> .mapGroupsWithState(
> new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
> Encoders.bean(ModelStateInfo.class),
> Encoders.bean(ModelUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
> 
> StateUpdateTask contains the update method.
> 
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglist...@gmail.com> wrote:
> 
> > Yes, that's exactly how I am creating them.
> >
> > Question... Are you using 'Stateful Structured Streaming' in which you've
> > something like this?
> >
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > updateAcrossEvents
> >   )
> >
> > And updating the Accumulator inside 'updateAcrossEvents'? We're 
> > experiencing this only under 'Stateful Structured Streaming'. In other 
> > streaming applications it works as expected.
> >
> >
> >
> > On Wed, May 27, 2020 at 9:01 AM Srinivas V  wrote:
> >
> >> Yes, I am talking about Application specific Accumulators. Actually I am
> >> getting the values printed in my driver log as well as sent to Grafana. Not
> >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> >> cluster(not local Mac) where I submit from master node. It should work the
> >> same for cluster mode as well.
> >> Create accumulators like this:
> >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> >>
> >>
> >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> >> mailinglist...@gmail.com> wrote:
> >>
> >>> Hmm... how would they go to Graphana if they are not getting computed in
> >>> your code? I am talking about the Application Specific Accumulators. The
> >>> other standard counters such as 'event.progress.inputRowsPerSecond' are
> >>> getting populated correctly!
> >>>
> >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V  wrote:
> >>>
> >>>> Hello,
> >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> >>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> >>>> But one consolation is that when I send metrics to Graphana, the values
> >>>> are coming there.
> >>>>
> >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> >>>> mailinglist...@gmail.com> wrote:
> >>>>
> >>>>> No this is not working even if I use LongAccumulator.
> >>>>>
> >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei  wrote:
> >>>>>
> >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
> >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to 
> >>>>>> replace
> >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or 
> >>>>>> LongAccumulator[3]
> >>>>>> and test if the StreamingListener and other codes are able to work?
> >>>>>>
> >>>>>> ---
> >>>>>> Cheers,
> >>>>>> -z
> >>>>>> [1]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435%7C1%7C0%7C637262629816034378sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3Dreserved=0
&

Re: Unit testing Spark/Scala code with Mockito

2020-05-20 Thread ZHANG Wei
AFAICT, depends on testing goals, Unit Test, Integration Test or E2E
Test.

For Unit Test, mostly, it tests individual class or class methods.
Mockito can help mock and verify dependent instances or methods.

For Integration Test, some Spark testing helper methods can setup the
environment, such as `runInterpreter`[1] for running codes in REPL. The
data source can be mocked by `Seq(...).toDS()` or reading a local file,
no need to access Hive service.

For E2E Test, the HDFS and Hive (normally, a local mini version) have
to be setup to service the real operations from Spark.

Just my 2 cents.

-- 
Cheers,
-z
[1] 
https://github.com/apache/spark/blob/a06768ec4d5059d1037086fe5495e5d23cde514b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala#L49

On Wed, 20 May 2020 15:36:06 +0100
Mich Talebzadeh  wrote:

> On a second note with regard Spark and read writes as I understand unit
> tests are not meant to test database connections. This should be done in
> integration tests to check that all the parts work together. Unit tests are
> just meant to test the functional logic, and not spark's ability to read
> from a database.
> 
> I would have thought that if the specific connectivity through third part
> tool (in my case reading XML file using Databricks jar) is required, then
> this should be done through Read Evaluate Print Loop – REPL environment of
> Spark Shell by writing some codec to quickly establish where the API
> successfully reads from the XML file.
> 
> Does this assertion sound correct?
> 
> thanks,
> 
> Mich
> 
> 
> 
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
> 
> 
> 
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 
> 
> 
> 
> On Wed, 20 May 2020 at 11:58, Mich Talebzadeh 
> wrote:
> 
> > Hi,
> >
> > I have a spark job that reads an XML file from HDFS, process it and port
> > data to Hive tables, one good and one exception table
> >
> > The Code itself works fine. I need to create Unit Test with Mockito
> > for it.. A unit
> > test should test functionality in isolation. Side effects from other
> > classes or the system should be eliminated for a unit test, if possible. So
> > basically there are three classes.
> >
> >
> >1. Class A, reads XML file and created a DF1 on it plus a DF2 on top
> >of DF1. Test data for XML file is already created
> >2. Class B, reads DF2 and post correct data through TempView and Spark
> >SQL to the underlying Hive table
> >3. Class C, read DF2 and post exception data again through TempView
> >and Spark SQL to the underlying Hive exception table
> >
> > I would like to know for cases covering tests for Class B and Class C what
> > Mockito format needs to be used..
> >
> > Thanks,
> >
> > Mich
> >
> >
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising from
> > such loss, damage or destruction.
> >
> >
> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: CSV data source : Garbled Japanese text and handling multilines

2020-05-20 Thread ZHANG Wei
May I get the CSV file's encoding, which can be checked by `file` command?

-- 
Cheers,
-z

On Tue, 19 May 2020 09:24:24 +0900
Ashika Umagiliya  wrote:

> In my Spark job (spark 2.4.1) , I am reading CSV files on S3.These files
> contain Japanese characters.Also they can have ^M character (u000D) so I
> need to parse them as multiline.
> 
> First I used following code to read CSV files:
> 
> implicit class DataFrameReadImplicits (dataFrameReader: DataFrameReader) {
>  def readTeradataCSV(schema: StructType, s3Path: String) : DataFrame = {
> 
> dataFrameReader.option("delimiter", "\u0001")
>   .option("header", "false")
>   .option("inferSchema", "false")
>   .option("multiLine","true")
>   .option("encoding", "UTF-8")
>   .option("charset", "UTF-8")
>   .schema(schema)
>   .csv(s3Path)
>  }
>   }
> 
> But when I read DF using this method all the Japanese characters are garbled.
> 
> After doing some tests I found out that If I read the same S3 file
> using *"spark.sparkContext.textFile(path)"* Japanese characters
> encoded properly.
> 
> So I tried this way :
> 
> implicit class SparkSessionImplicits (spark : SparkSession) {
> def readTeradataCSV(schema: StructType, s3Path: String) = {
>   import spark.sqlContext.implicits._
>   spark.read.option("delimiter", "\u0001")
> .option("header", "false")
> .option("inferSchema", "false")
> .option("multiLine","true")
> .schema(schema)
> .csv(spark.sparkContext.textFile(s3Path).map(str =>
> str.replaceAll("\u000D"," ")).toDS())
> }
>   }
> 
> Now the encoding issue is fixed.However multilines doesn't work
> properly and lines are broken near ^M character , even though I tried
> to replace ^M using *str.replaceAll("\u000D"," ")*
> 
> Any tips on how to read Japanese characters using first method, or
> handle multi-lines using the second method ?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark Accumulators with Structured Streaming

2020-05-15 Thread ZHANG Wei
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic 
or thread safe. I'm wondering if the implementation for `java.util.Map[T, 
Long]` can meet it or not. Is there any chance to replace 
CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and 
test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator


From: Something Something 
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is 
supported and tested? I've spent a lot of time on this but can't get it to 
work. Just to add more context, we've our own Accumulator class that extends 
from AccumulatorV2. In this class we keep track of one or more accumulators. 
Here's the definition:


class CollectionLongAccumulator[T]
extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly 
appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something 
mailto:mailinglist...@gmail.com>> wrote:

In my structured streaming job I am updating Spark Accumulators in the 
updateAcrossEvents method but they are always 0 when I try to print them in my 
StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
  )


The accumulators get incremented in 'updateAcrossEvents'. I've a 
StreamingListener which writes values of the accumulators in 'onQueryProgress' 
method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these 
accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine 
which implies that the Accumulators are not getting distributed correctly - or 
something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an 
"Action". That's not a solution here. This is a 'Stateful Structured Streaming' 
job. Yes, I am also 'registering' them in SparkContext.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark] Tagging descriptions

2020-05-13 Thread ZHANG Wei
AFAICT, from the data size (25B rows, key cell 300 chars string), looks
like a common Spark job. But the regex might be complex, I guess there
are lots of items to match as (apple|banana|cola|...) from the purchase
list. Regex matching is a high CPU computing task. If the current
performance with more partitions doesn't meet the requirement, the
keywords indexing might be a little help -- rather tokenize the
purchase list firstly and index them (like a search engine) than RegEx
them directly. And there are alos several search engines can work well
with Spark, such as Elasticsearch, Solr.

-- 
Cheers,
-z

On Wed, 13 May 2020 00:47:02 -0400
Rishi Shah  wrote:

> Thanks ZHANG! Please find details below:
> 
> # of rows: ~25B, row size would be somewhere around ~3-5MB (it's a parquet
> formatted data so, need to worry about only the columns to be tagged)
> 
> avg length of the text to be parsed : ~300
> 
> Unfortunately don't have sample data or regex which I can share freely.
> However about data being parsed - assume these are purchases made online
> and we are trying to parse the transaction details. Like purchases made on
> amazon can be tagged to amazon as well as other vendors etc.
> 
> Appreciate your response!
> 
> 
> 
> On Tue, May 12, 2020 at 6:23 AM ZHANG Wei  wrote:
> 
> > May I get some requirement details?
> >
> > Such as:
> > 1. The row count and one row data size
> > 2. The avg length of text to be parsed by RegEx
> > 3. The sample format of text to be parsed
> > 4. The sample of current RegEx
> >
> > --
> > Cheers,
> > -z
> >
> > On Mon, 11 May 2020 18:40:49 -0400
> > Rishi Shah  wrote:
> >
> > > Hi All,
> > >
> > > I have a tagging problem at hand where we currently use regular
> > expressions
> > > to tag records. Is there a recommended way to distribute & tag? Data is
> > > about 10TB large.
> > >
> > > --
> > > Regards,
> > >
> > > Rishi Shah
> >
> 
> 
> -- 
> Regards,
> 
> Rishi Shah

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark] Tagging descriptions

2020-05-12 Thread ZHANG Wei
May I get some requirement details?

Such as:
1. The row count and one row data size
2. The avg length of text to be parsed by RegEx
3. The sample format of text to be parsed
4. The sample of current RegEx

-- 
Cheers,
-z

On Mon, 11 May 2020 18:40:49 -0400
Rishi Shah  wrote:

> Hi All,
> 
> I have a tagging problem at hand where we currently use regular expressions
> to tag records. Is there a recommended way to distribute & tag? Data is
> about 10TB large.
> 
> -- 
> Regards,
> 
> Rishi Shah

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-07 Thread ZHANG Wei
AFAICT, there might be data skews, some partitions got too much rows,
which caused out of memory limitation. Trying .groupBy().count()
or .aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is
worth a try.

-- 
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +
Gautham Acharya  wrote:

> Hi everyone,
> 
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
> 
> The GROUP BY function runs on a wide dataset. The first column of the dataset 
> contains string labels that are GROUPed on. The remaining columns are numeric 
> values that are aggregated in the Pandas UDF. The dataset is very wide, with 
> 50,000 columns and 3 million rows.
> 
> --
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_5|
> |   label_a  | 2.0| 5.6   |  7.123  |
> |   label_b  | 11.0  | 1.4   |  2.345  |
> |   label_a  | 3.1| 6.2   |  5.444  |
> 
> 
> 
> My job runs fine on smaller datasets, with the same number of columns but 
> fewer rows. However, when run on a dataset with 3 million rows, I see the 
> following exception:
> 
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 
> 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 377, in main
> process()
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 372, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 286, in dump_stream
> for series in iterator:
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 303, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> 
> Looking at this issue, it 
> looks like PyArrow has a 2GB limit for each shard that is sent to the 
> grouping function.
> 
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory 
> each.
> 
> I've attached the full error log here as well. What are some workarounds that 
> I can do to get this job running? Unfortunately, we are running up to a 
> production release and this is becoming a severe blocker.
> 
> Thanks,
> Gautham
> 
> 
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured Streaming] NullPointerException in long running query

2020-04-29 Thread ZHANG Wei
Is there any chance we also print the least recent failure in stage as the
following most recent failure before Driver statcktrace? 

> >>   Caused by: org.apache.spark.SparkException: Job aborted due to stage
> >> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
> >> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
> >> java.lang.NullPointerException
> >> Driver stacktrace:

-- 
Cheers,
-z

On Tue, 28 Apr 2020 23:48:17 -0700
"Shixiong(Ryan) Zhu"  wrote:

> The stack trace is omitted by JVM when an exception is thrown too
> many times. This usually happens when you have multiple Spark tasks on the
> same executor JVM throwing the same exception. See
> https://stackoverflow.com/a/3010106
> 
> Best Regards,
> Ryan
> 
> 
> On Tue, Apr 28, 2020 at 10:45 PM lec ssmi  wrote:
> 
> > It should be a problem of my data quality. It's curious why the
> > driver-side exception stack has no specific exception information.
> >
> > Edgardo Szrajber  于2020年4月28日周二 下午3:32写道:
> >
> >> The exception occured while aborting the stage. It might be interesting
> >> to try to understand the reason for the abortion.
> >> Maybe timeout? How long the query run?
> >> Bentzi
> >>
> >> Sent from Yahoo Mail on Android
> >> 
> >>
> >> On Tue, Apr 28, 2020 at 9:25, Jungtaek Lim
> >>  wrote:
> >> The root cause of exception is occurred in executor side "Lost task 10.3
> >> in stage 1.0 (TID 81, spark6, executor 1)" so you may need to check there.
> >>
> >> On Tue, Apr 28, 2020 at 2:52 PM lec ssmi  wrote:
> >>
> >> Hi:
> >>   One of my long-running queries occasionally encountered the following
> >> exception:
> >>
> >>
> >>   Caused by: org.apache.spark.SparkException: Job aborted due to stage
> >> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
> >> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
> >> java.lang.NullPointerException
> >> Driver stacktrace:
> >> at org.apache.spark.scheduler.DAGScheduler.org
> >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
> >> at
> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
> >> at
> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
> >> at
> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> >> at
> >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
> >> at
> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> >> at
> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> >> at scala.Option.foreach(Option.scala:257)
> >> at
> >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
> >> at
> >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
> >> at
> >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
> >> at
> >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
> >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> >> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
> >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
> >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
> >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
> >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
> >> at
> >> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
> >> at
> >> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
> >> at
> >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> >> at
> >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
> >> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
> >> at
> >> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
> >> at
> >> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
> >> at
> >> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> >> at
> >> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
> >> at
> >> 

Re: Filtering on multiple columns in spark

2020-04-29 Thread ZHANG Wei
AFAICT, maybe Spark SQL built-in functions[1] can help as below:

scala> df.show()
++---+
| age|   name|
++---+
|null|Michael|
|  30|   Andy|
|  19| Justin|
++---+


scala> df.filter("length(name) == 4 or substring(name, 1, 1) == 'J'").show()
+---+--+
|age|  name|
+---+--+
| 30|  Andy|
| 19|Justin|
+---+--+


-- 
Cheers,
-z
[1] https://spark.apache.org/docs/latest/api/sql/index.html

On Wed, 29 Apr 2020 08:45:26 +0100
Mich Talebzadeh  wrote:

> Hi,
> 
> 
> 
> Trying to filter a dataframe with multiple conditions using OR "||" as below
> 
> 
> 
>   val rejectedDF = newDF.withColumn("target_mobile_no",
> col("target_mobile_no").cast(StringType)).
> 
>filter(length(col("target_mobile_no")) !== 10 ||
> substring(col("target_mobile_no"),1,1) !== "7")
> 
> 
> 
> This throws this error
> 
> 
> 
> res12: org.apache.spark.sql.DataFrame = []
> 
> :49: error: value || is not a member of Int
> 
>   filter(length(col("target_mobile_no")) !== 10 ||
> substring(col("target_mobile_no"),1,1) !== "7")
> 
> 
> 
> Try another way
> 
> 
> 
> val rejectedDF = newDF.withColumn("target_mobile_no",
> col("target_mobile_no").cast(StringType)).
> 
>filter(length(col("target_mobile_no")) !=== 10 ||
> substring(col("target_mobile_no"),1,1) !=== "7")
> 
>   rejectedDF.createOrReplaceTempView("tmp")
> 
> 
> 
> Tried few options but I am still getting this error
> 
> 
> 
> :49: error: value !=== is not a member of
> org.apache.spark.sql.Column
> 
>   filter(length(col("target_mobile_no")) !=== 10 ||
> substring(col("target_mobile_no"),1,1) !=== "7")
> 
>  ^
> 
> :49: error: value || is not a member of Int
> 
>   filter(length(col("target_mobile_no")) !=== 10 ||
> substring(col("target_mobile_no"),1,1) !=== "7")
> 
> 
> 
> I can create a dataframe for each filter but that does not look efficient
> to me?
> 
> 
> 
> Thanks
> 
> 
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark ORC store written timestamp as column

2020-04-24 Thread ZHANG Wei
>From what I think I understand, the OrcOutputWriter leverages orc-core
to write. I'm wondering if ORC supports the row metadata or not. If
not, maybe the org.apache.orc.Writer::addRowBatch() can be overrided to
record the metadata after RowBatch written.

-- 
Cheers,
-z

On Thu, 16 Apr 2020 04:47:31 +
Manjunath Shetty H  wrote:

> Hi All,
> 
> Is there anyway to store the exact written timestamp in the ORC file through 
> spark ?.
> Use case something like `current_timestamp()` function in SQL. Generating in 
> the program will not be equal to actual write time in ORC/hdfs file.
> 
> Any suggestions will be helpful.
> 
> 
> Thanks
> Manjunath

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Save Spark dataframe as dynamic partitioned table in Hive

2020-04-24 Thread ZHANG Wei
AFAICT, we can use spark.sql(s"select $name ..."), name is a value in
Scala context[1].

-- 
Cheers,
-z

[1] https://docs.scala-lang.org/overviews/core/string-interpolation.html

On Fri, 17 Apr 2020 00:10:59 +0100
Mich Talebzadeh  wrote:

> Thanks Patrick,
> 
> The partition  broadcastId is static as defined as a value below
> 
> 
> val broadcastValue = "123456789"  // I assume this will be sent as a
> constant for the batch
> 
> // Create a DF on top of XML
> val df = spark.read.
> format("com.databricks.spark.xml").
> option("rootTag", "hierarchy").
> option("rowTag", "sms_request").
> load("/tmp/broadcast.xml")
> 
> // add this constant column to dataframe itself
> val newDF = df.withColumn("broadcastId", lit(broadcastValue))
> 
> newDF.show(100,false)
> 
> newDF.createOrReplaceTempView("tmp")
> 
> // Need to create and populate target Parquet table
> michtest.BroadcastStaging
> //
> HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")
> 
>   var sqltext = """
>   CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
>  partyId STRING
>, phoneNumber STRING
>   )
>   PARTITIONED BY (
>  broadcastId STRING
>, brand STRING
> )
>   STORED AS PARQUET
>   """
>   HiveContext.sql(sqltext)
>   //
>   // Put data in Hive table
>   //
>  // Dynamic partitioning is disabled by default. We turn it on
>  spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
> 
> // Now put static partition (broadcastId) first and dynamic partition
> (brand)last
> 
>   sqltext = """
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
>   SELECT
>   ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
> , broadcastId
> , brand
>   FROM tmp
>   """
>   spark.sql(sqltext)
> 
> Still not working properly
> 
> sqltext: String =
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
>   SELECT
>   ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
> , broadcastId
> , brand
>   FROM tmp
> 
> scala>   spark.sql(sqltext)
> org.apache.spark.sql.catalyst.parser.ParseException:
> missing STRING at ','(line 2, pos 85)
> 
> == SQL ==
> 
>   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> broadcastValue, brand)
> -^^^
>   SELECT
>   ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
> , broadcastId
> , brand
>   FROM tmp
> 
>   at
> org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
>   at
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
>   at
> org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
>   at
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
>   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
>   ... 55 elided
> 
> 
> The thing is that if I replace broadcastId = broadcastValue with  broadcastId
> = " 123456789" it works!
> 
> 
> Thanks,
> 
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 
> 
> 
> 
> On Thu, 16 Apr 2020 at 13:25, Patrick McCarthy
>  wrote:
> 
> > What happens if you change your insert statement to be
> >
> >   INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> > broadcastValue, brand)
> >
> > and then add the value for brand into the select as
> >   SELECT
> >   ocis_party_id AS partyId
> > , target_mobile_no AS phoneNumber
> > , brand
> > You may need to rearrange the order of the partitions to put dynamic
> > partitions before static.
> >
> > On Wed, Apr 15, 2020 at 7:48 PM Mich Talebzadeh 
> > wrote:
> >
> >> Hi,
> >>
> >> I have an XML file that is read into Spark using Databa bricks jar file
> >>
> >> spark-xml_2.11-0.9.0.jar
> >>
> >> Doing some tests
> >>
> >> This is the format of XML (one row here)
> >>
> >> //*
> >> 
> >> SKY
> >> 0123456789
> >> 123456789
> >> XYZ
> >> GLX
> >> 12345678
> >> 
> >> 
> >> 
> >> 
> >> */
> >>
> >> OK I am trying to insert data into a hive partitioned table through spark
> >> as follows:
> >>
> >> import org.apache.spark.sql.DataFrame
> >> import org.apache.spark.sql.functions._
> >> import 

Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-23 Thread ZHANG Wei
That's not dead locked. They are just trying acqure the same Monitor lock, and 
there are 3 threads. One acquired, and others are waiting for the lock being 
released. It's a common senario. You have to check the monitor lock object from 
callstack source code. There should be some operations after holding the lock, 
and a cause to wait for something, which could be a hint.
> After refreshing a couple of times, I notice the lock is being swapped 
> between these 3. The other 2 will be blocked by whoever gets this lock, in a 
> cycle of 160 has lock -> 161 -> 159 -> 160

But back to your case, those are normal SparkUI HTTP service behaviors:
> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE | Monitor 
> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED | Blocked by 
> Thread(Some(160)) Lock
> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED | Blocked 
> by Thread(Some(160)) Lock

Since I have no your core dump, just use my local environment dump as a demo, 
and comment in line:

  29
SparkUI-29-acceptor-0@105d657c-ServerConnector@1e9a8907{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
  RUNNABLEMonitor(java.lang.Object@1195548344})
  sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
   ^^ The native accept0() 
can be understanded as Linux accept() [1], this can block.
  sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
  sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) 
=> holding Monitor(java.lang.Object@1195548344})
   I can find 
`synchronized(this.lock)` in this source localtion context
  
org.spark_project.jetty.server.ServerConnector.accept(ServerConnector.java:397)
  
org.spark_project.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.java:601)
  
org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
  
org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
  java.lang.Thread.run(Thread.java:748)

So, you might have to find another clues, just let those "SparkUI-" go,  or 
check the log to see if someone is working on something.

Just my 2 cents.

--
Cheers,
-z

[1] -- https://linux.die.net/man/2/accept


From: Jungtaek Lim 
Sent: Wednesday, April 22, 2020 11:21
To: Ruijing Li
Cc: Gabor Somogyi; Mich Talebzadeh; ZHANG Wei; user
Subject: Re: Spark hangs while reading from jdbc - does nothing Removing Guess 
work from trouble shooting

No, that's not a thing to apologize for. It's just your call - less context 
would bring less reaction and interest.

On Wed, Apr 22, 2020 at 11:50 AM Ruijing Li 
mailto:liruijin...@gmail.com>> wrote:
I apologize, but I cannot share it, even if it is just typical spark libraries. 
I definitely understand that limits debugging help, but wanted to understand if 
anyone has encountered a similar issue.

On Tue, Apr 21, 2020 at 7:12 PM Jungtaek Lim 
mailto:kabhwan.opensou...@gmail.com>> wrote:
If there's no third party libraries in the dump then why not share the thread 
dump? (I mean, the output of jstack)

stack trace would be more helpful to find which thing acquired lock and which 
other things are waiting for acquiring lock, if we suspect deadlock.

On Wed, Apr 22, 2020 at 2:38 AM Ruijing Li 
mailto:liruijin...@gmail.com>> wrote:
After refreshing a couple of times, I notice the lock is being swapped between 
these 3. The other 2 will be blocked by whoever gets this lock, in a cycle of 
160 has lock -> 161 -> 159 -> 160

On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li 
mailto:liruijin...@gmail.com>> wrote:
In thread dump, I do see this
- SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE | Monitor
- SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED | Blocked by 
Thread(Some(160)) Lock
-  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED | Blocked by 
Thread(Some(160)) Lock

Could the fact that 160 has the monitor but is not running be causing a 
deadlock preventing the job from finishing?

I do see my Finalizer and main method are waiting. I don’t see any other 
threads from 3rd party libraries or my code in the dump. I do see spark context 
cleaner has timed waiting.

Thanks


On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li 
mailto:liruijin...@gmail.com>> wrote:
Strangely enough I found an old issue that is the exact same issue as mine
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fplugins%2Fservlet%2Fmobile%23issue%2FSPARK-18343=02%7C01%7C%7Caffcf245a3b94e07c40408d7e66c566a%7C84df9e7fe9f640afb435%7C1%7C0%7C637231225280613776=NdPXb2Qdw6S5Tuqs1eRFfyHQoppi8JKI%2F2hTED%2BxOy8%3D=0>

Re: What is the best way to take the top N entries from a hive table/data source?

2020-04-22 Thread ZHANG Wei
The performance issue might be caused by the parquet table partitions count, 
only 3. The reader used that partitions count to parallelize extraction.

Refer to the log you provided:
> spark.sql("select * from db.table limit 100").explain(false)
> == Physical Plan ==
> CollectLimit 100
> +- FileScan parquet ... 806 more fields] Batched: false, Format: Parquet, 
> Location: CatalogFileIndex[...], PartitionCount: 3, PartitionFilters: [], 
> PushedFilters: [], ReadSchema:.
...PartitionCount: 3,...

According to the first email:
> val df = spark.sql("select * from table limit n")
> df.write.parquet()

You can try to recreate the parquet table with more partitions. Hope this page 
https://mungingdata.com/apache-spark/partitionby/ can help you.

---
Cheers,
-z

From: Yeikel 
Sent: Wednesday, April 22, 2020 12:17
To: user@spark.apache.org
Subject: Re: What is the best way to take the top N entries from a hive 
table/data source?

Hi Zhang. Thank you for your response

While your answer clarifies my confusion with `CollectLimit` it still does
not clarify what is the recommended way to extract large amounts of data
(but not all the records) from a source and maintain a high level of
parallelism.

For example , at some instances trying to extract 1 million records from a
table with over 100M records , I see my cluster using 1-2 cores out of the
hundreds that I have available.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: What is the best way to take the top N entries from a hive table/data source?

2020-04-21 Thread ZHANG Wei
https://github.com/apache/spark/pull/7334 may explain the question as below:

>  This patch preserves this optimization by treating logical Limit operators 
> specially when they appear as the terminal operator in a query plan: if a 
> Limit is the final operator, then we will plan a special CollectLimit 
> physical operator which implements the old take()-based logic.

For `spark.sql("select * from db.table limit 100").explain(false)`, `limit` 
is the final operator;
for `spark.sql("select * from db.table limit 
100").repartition(1000).explain(false)`, `repartition` is the final 
operator. If you add a `.limit()` operation after `repartition`, such as 
`spark.sql("select * from db.table limit 
100").repartition(1000).limit(1000).explain(false)`, the `CollectLimit` 
will show again.

---
Cheers,
-z


From: Yeikel 
Sent: Wednesday, April 15, 2020 2:45
To: user@spark.apache.org
Subject: Re: What is the best way to take the top N entries from a hive 
table/data source?

Looking at the results of explain, I can see a CollectLimit step. Does that
work the same way as a regular .collect() ? (where all records are sent to
the driver?)


spark.sql("select * from db.table limit 100").explain(false)
== Physical Plan ==
CollectLimit 100
+- FileScan parquet ... 806 more fields] Batched: false, Format: Parquet,
Location: CatalogFileIndex[...], PartitionCount: 3, PartitionFilters: [],
PushedFilters: [], ReadSchema:.
db: Unit = ()

The number of partitions is 1 so that makes sense.

spark.sql("select * from db.table limit 100").rdd.partitions.size = 1

As a follow up , I tried to repartition the resultant dataframe and while I
can't see the CollectLimit step anymore , It did not make any difference in
the job. I still saw a big task at the end that ends up failing.

spark.sql("select * from db.table limit
100").repartition(1000).explain(false)

Exchange RoundRobinPartitioning(1000)
+- GlobalLimit 100
   +- Exchange SinglePartition
  +- LocalLimit 100  -> Is this a collect?





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Cross Region Apache Spark Setup

2020-04-20 Thread ZHANG Wei
There might be 3 options:

1. Just as you expect,  only ONE application, ONE rdd with regioned containers 
and executors automatically allocated and distributed, the ResourceProfile 
(https://issues.apache.org/jira/browse/SPARK-27495) may meet the requirement, 
treating Region as a type of resource just like GPU. But you have to wait for 
the full feature finished. And I can image the trouble shooting challenges.
2. Label Yarn nodes with region tag, group them into queues and submit the 
different jobs for different regions into dedicate queues (with –queue argument 
when submitting).
3. Build seperated Spark clusters with independed Yarn Resource manager for 
regions, such as, UK cluster, US-east cluster, US-west cluster, looks dirty, 
but easy to deploy and manage, you can schedule the job by the region busy and 
idle hours to get more performance and lower cost.

Just my 2 cents

---
Cheers,
-z


From: Stone Zhong 
Sent: Wednesday, April 15, 2020 4:31
To: user@spark.apache.org
Subject: Cross Region Apache Spark Setup

Hi,

I am trying to setup a cross region Apache Spark cluster. All my data are 
stored in Amazon S3 and well partitioned by region.

For example, I have parquet file at
S3://mybucket/sales_fact.parquet/us-west
S3://mybucket/sales_fact.parquet/us-east
S3://mybucket/sales_fact.parquet/uk

And my cluster have nodes in us-west, us-east and uk region -- basically I have 
node in all region that I supported.

When I have code like:

df = spark.read.parquet("S3://mybucket/sales_fact.parquet/*")
print(df.count()) #1
print(df.select("product_id").distinct().count()) #2

For #1, I expect only us-west nodes read data partition in us-west, and etc, 
and spark to add 3 regional count and return me a total count. I do not expect 
large cross region data transfer in this case.
For #2, I expect only us-west nodes read data partition in us-west, and etc. 
Each region, do the distinct() locally first, and merge 3 "product_id" list and 
do a distinct() again, I am ok with the necessary cross-region data transfer 
for merging the distinct product_ids

Can anyone please share the best practice? Is it possible to config the Apache 
Spark to work in such a way?

Any idea and help is appreciated!

Thanks,
Stone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can I run Spark executors in a Hadoop cluster from a Kubernetes container

2020-04-20 Thread ZHANG Wei
Looks like you'd like to submit Spark job out of Spark cluster, Apache Livy 
[https://livy.incubator.apache.org/] worths a try, which provides a REST 
service for Spark in a Hadoop cluster.

Cheers,
-z


From: mailford...@gmail.com 
Sent: Thursday, April 16, 2020 20:26
To: user
Subject: Can I run Spark executors in a Hadoop cluster from a Kubernetes 
container

Hi,
I want to deploy Spark client in a Kubernetes container. Further on , I want to 
run the spark job in a Hadoop cluster (meaning the resources of the Hadoop 
cluster will be leveraged) but call it from the K8S container. My question is 
whether this mode of implementation possible? Do let me know please.
Thanks,
Debu

Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How does spark sql evaluate case statements?

2020-04-16 Thread ZHANG Wei
Are you looking for this: https://spark.apache.org/docs/2.4.0/api/sql/#when ?

The code generated will look like this in a `do { ... } while (false)` loop:

  do {
${cond.code}
if (!${cond.isNull} && ${cond.value}) {
  ${res.code}
  $resultState = (byte)(${res.isNull} ? $HAS_NULL : $HAS_NONNULL);
  ${ev.value} = ${res.value};
  continue;
}


  } while (false)

Refer to: 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala#L208

Here is a full generated code sample for `spark.sql("select CASE WHEN age IS 
NULL THEN 'unknown' WHEN age < 30 THEN 'young' WHEN age < 40 THEN 'middle' ELSE 
'senior' END, name  from people").show()` :

/* 034 */   byte project_caseWhenResultState_0 = -1;
/* 035 */   do {
/* 036 */ if (!false && scan_isNull_0) {
/* 037 */   project_caseWhenResultState_0 = (byte)(false ? 1 : 0);
/* 038 */   project_mutableStateArray_0[0] = ((UTF8String) 
references[1] /* literal */); 
/* 039 */   continue;
/* 040 */ }
/* 041 */
/* 042 */ boolean project_isNull_4 = true;
/* 043 */ boolean project_value_4 = false;
/* 044 */
/* 045 */ if (!scan_isNull_0) {
/* 046 */   project_isNull_4 = false; // resultCode could change 
nullability.
/* 047 */   project_value_4 = scan_value_0 < 30L;
/* 048 */
/* 049 */ }
/* 050 */ if (!project_isNull_4 && project_value_4) {
/* 051 */   project_caseWhenResultState_0 = (byte)(false ? 1 : 0);
/* 052 */   project_mutableStateArray_0[0] = ((UTF8String) 
references[2] /* literal */); 
/* 053 */   continue;
/* 054 */ }
/* 055 */
/* 056 */ boolean project_isNull_8 = true;
/* 057 */ boolean project_value_8 = false;
/* 058 */
/* 059 */ if (!scan_isNull_0) {
/* 060 */   project_isNull_8 = false; // resultCode could change 
nullability.
/* 061 */   project_value_8 = scan_value_0 < 40L;
/* 062 */
/* 063 */ }
/* 064 */ if (!project_isNull_8 && project_value_8) {
/* 065 */   project_caseWhenResultState_0 = (byte)(false ? 1 : 0);
/* 066 */   project_mutableStateArray_0[0] = ((UTF8String) 
references[3] /* literal */); 
/* 067 */   continue;
/* 068 */ }
/* 069 */
/* 070 */ project_caseWhenResultState_0 = (byte)(false ? 1 : 0);
/* 071 */ project_mutableStateArray_0[0] = ((UTF8String) references[4] 
/* literal */);
/* 072 */
/* 073 */   } while (false);


Cheers,
-z

From: Yeikel 
Sent: Wednesday, April 15, 2020 2:22
To: user@spark.apache.org
Subject: Re: How does spark sql evaluate case statements?

I do not know the answer to this question so I am also looking for it,  but
@kant maybe the generated code can help with this.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is there any way to set the location of the history for the spark-shell per session?

2020-04-16 Thread ZHANG Wei
You are welcome!

It's not in Spark sourcecode. It's in Scala source:

https://github.com/scala/scala/blob/2.11.x/src/repl-jline/scala/tools/nsc/interpreter/jline/FileBackedHistory.scala#L26

Reference Code:

  // For a history file in the standard location, always try to restrict 
permission,
  // creating an empty file if none exists.
  // For a user-specified location, only lock down permissions if we're the ones
  // creating it, otherwise responsibility for permissions is up to the caller.
  protected lazy val historyFile: File = File {
propOrNone("scala.shell.histfile").map(Path.apply) match {
  case Some(p) => if (!p.exists) secure(p) else p
  case None => secure(Path(userHome) / FileBackedHistory.defaultFileName)
}
  }

Cheers,
-z


From: Yeikel 
Sent: Friday, April 17, 2020 5:24
To: user@spark.apache.org
Subject: Re: Is there any way to set the location of the history for the 
spark-shell per session?

Thank you. That's what I was looking for.

I only found a PR from Scala when I googled it , so if you remember your
source , please do so.

Thanks!




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to pass a constant value to a partitioned hive table in spark

2020-04-16 Thread ZHANG Wei
> scala>   spark.sql($sqltext)
> :41: error: not found: value $sqltext
>  spark.sql($sqltext)
 ^
 +-- should be Scala language

Try this:

scala> spark.sql(sqltext)

-- 
Cheers,
-z

On Thu, 16 Apr 2020 08:49:40 +0100
Mich Talebzadeh  wrote:

> I have a variable to be passed to a column of partition as shown below
> 
> *val broadcastValue = "123456789" * // I assume this will be sent as a
> constant for the batch
> // Create a DF on top of XML
> 
> df.createOrReplaceTempView("tmp")
> // Need to create and populate target Parquet table
> michtest.BroadcastStaging
> //
> HiveContext.sql("""DROP TABLE IF EXISTS michtest.BroadcastStaging""")
> 
>   var sqltext = """
>   CREATE TABLE IF NOT EXISTS michtest.BroadcastStaging (
>  partyId STRING
>, phoneNumber STRING
>   )
>   PARTITIONED BY (
>  broadcastId STRING
>, brand STRING)
>   STORED AS PARQUET
>   """
>   HiveContext.sql(sqltext)
> 
> // Now insert the data from temp table
>   //
>   // Put data in Hive table
>   //
>  // Dynamic partitioning is disabled by default. We turn it on
>  spark.sql("SET hive.exec.dynamic.partition = true")
>  spark.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
> 
>   sqltext = """
> 
> *  $INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> $broadcastValue, brand = "dummy")*  SELECT
>   ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
>   FROM tmp
>   """
>   spark.sql($sqltext)
> 
> 
> However, this does not work!
> 
> 
> scala>   sqltext = """
>  |   $INSERT INTO TABLE michtest.BroadcastStaging PARTITION
> (broadcastId = $broadcastValue, brand = "dummy")
>  |   SELECT
>  |   ocis_party_id AS partyId
>  | , target_mobile_no AS phoneNumber
>  |   FROM tmp
>  |   """
> sqltext: String =
>   $INSERT INTO TABLE michtest.BroadcastStaging PARTITION (broadcastId =
> $broadcastValue, brand = "dummy")
>   SELECT
>   ocis_party_id AS partyId
> , target_mobile_no AS phoneNumber
>   FROM tmp
> 
> 
> scala>   spark.sql($sqltext)
> :41: error: not found: value $sqltext
>  spark.sql($sqltext)
> 
> 
> Any ideas?
> 
> 
> Thanks
> 
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Core]: Does an executor only cache the partitions it requires for its computations or always the full RDD?

2020-04-16 Thread ZHANG Wei
As far as I know, if you are talking about RDD.cache(), the answer is the 
executor only caches the partition it requires.

Cheers,
-z


From: zwithouta 
Sent: Tuesday, April 14, 2020 18:28
To: user@spark.apache.org
Subject: [Spark Core]: Does an executor only cache the partitions it requires 
for its computations or always the full RDD?

Provided caching is activated for a RDD, does each executor of a cluster only
cache the partitions it requires for its computations or always the full
RDD?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-04-16 Thread ZHANG Wei
The Thread dump result table of Spark UI can provide some clues to find out 
thread locks issue, such as:

  Thread ID | Thread Name  | Thread State | Thread Locks
  13| NonBlockingInputStreamThread | WAITING  | Blocked by Thread 
Some(48) Lock(jline.internal.NonBlockingInputStream@103008951})
  48| Thread-16| RUNNABLE | 
Monitor(jline.internal.NonBlockingInputStream@103008951})

And echo thread row can show the call stacks after being clicked, then you can 
check the root cause of holding locks like this(Thread 48 of above):

  org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native Method)
  
org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
  org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
  
org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
  jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
  

Hope it can help you.

-- 
Cheers,
-z

On Thu, 16 Apr 2020 16:36:42 +0900
Jungtaek Lim  wrote:

> Do thread dump continuously, per specific period (like 1s) and see the
> change of stack / lock for each thread. (This is not easy to be done in UI
> so maybe doing manually would be the only option. Not sure Spark UI will
> provide the same, haven't used at all.)
> 
> It will tell which thread is being blocked (even it's shown as running) and
> which point to look at.
> 
> On Thu, Apr 16, 2020 at 4:29 PM Ruijing Li  wrote:
> 
> > Once I do. thread dump, what should I be looking for to tell where it is
> > hanging? Seeing a lot of timed_waiting and waiting on driver. Driver is
> > also being blocked by spark UI. If there are no tasks, is there a point to
> > do thread dump of executors?
> >
> > On Tue, Apr 14, 2020 at 4:49 AM Gabor Somogyi 
> > wrote:
> >
> >> The simplest way is to do thread dump which doesn't require any fancy
> >> tool (it's available on Spark UI).
> >> Without thread dump it's hard to say anything...
> >>
> >>
> >> On Tue, Apr 14, 2020 at 11:32 AM jane thorpe 
> >> wrote:
> >>
> >>> Here a is another tool I use Logic Analyser  7:55
> >>> https://youtu.be/LnzuMJLZRdU
> >>>
> >>> you could take some suggestions for improving performance  queries.
> >>> https://dzone.com/articles/why-you-should-not-use-select-in-sql-query-1
> >>>
> >>>
> >>> Jane thorpe
> >>> janethor...@aol.com
> >>>
> >>>
> >>> -Original Message-
> >>> From: jane thorpe 
> >>> To: janethorpe1 ; mich.talebzadeh <
> >>> mich.talebza...@gmail.com>; liruijing09 ; user <
> >>> user@spark.apache.org>
> >>> Sent: Mon, 13 Apr 2020 8:32
> >>> Subject: Re: Spark hangs while reading from jdbc - does nothing Removing
> >>> Guess work from trouble shooting
> >>>
> >>>
> >>>
> >>> This tool may be useful for you to trouble shoot your problems away.
> >>>
> >>>
> >>> https://www.javacodegeeks.com/2020/04/simplifying-apm-remove-the-guesswork-from-troubleshooting.html
> >>>
> >>>
> >>> "APM tools typically use a waterfall-type view to show the blocking
> >>> time of different components cascading through the control flow within an
> >>> application.
> >>> These types of visualizations are useful, and AppOptics has them, but
> >>> they can be difficult to understand for those of us without a PhD."
> >>>
> >>> Especially  helpful if you want to understand through visualisation and
> >>> you do not have a phD.
> >>>
> >>>
> >>> Jane thorpe
> >>> janethor...@aol.com
> >>>
> >>>
> >>> -Original Message-
> >>> From: jane thorpe 
> >>> To: mich.talebzadeh ; liruijing09 <
> >>> liruijin...@gmail.com>; user 
> >>> CC: user 
> >>> Sent: Sun, 12 Apr 2020 4:35
> >>> Subject: Re: Spark hangs while reading from jdbc - does nothing
> >>>
> >>> You seem to be implying the error is intermittent.
> >>> You seem to be implying data is being ingested  via JDBC. So the
> >>> connection has proven itself to be working unless no data is arriving from
> >>> the  JDBC channel at all.  If no data is arriving then one could say it
> >>> could be  the JDBC.
> >>> If the error is intermittent  then it is likely a resource involved in
> >>> processing is filling to capacity.
> >>> Try reducing the data ingestion volume and see if that completes, then
> >>> increase the data ingested  incrementally.
> >>> I assume you have  run the job on small amount of data so you have
> >>> completed your prototype stage successfully.
> >>>
> >>> --
> >>> On Saturday, 11 April 2020 Mich Talebzadeh 
> >>> wrote:
> >>> Hi,
> >>>
> >>> Have you checked your JDBC connections from Spark to Oracle. What is
> >>> Oracle saying? Is it doing anything or hanging?
> >>>
> >>> set pagesize 
> >>> set linesize 140
> >>> set heading off
> >>> select SUBSTR(name,1,8) || ' sessions as on '||TO_CHAR(CURRENT_DATE,
> >>> 'MON DD  HH:MI AM') from v$database;
> >>> set heading on
> >>> column spid heading "OS PID" format a6
> >>> column process format a13 heading "Client ProcID"
> 

Re: Is there any way to set the location of the history for the spark-shell per session?

2020-04-16 Thread ZHANG Wei
>From my understanding, you are talking about spark-shell command history, 
>aren't you?

If yes, you can try adding `--conf 
'spark.driver.extraJavaOptions=-Dscala.shell.histfile=` into spark-shell command arguments since Spark shell is leveraging 
Scala REPL JLine file backend history settings.

Cheers,
-z


From: Yeikel 
Sent: Wednesday, April 15, 2020 2:20
To: user@spark.apache.org
Subject: Is there any way to set the location of the history for the 
spark-shell per session?

In my team , we get elevated access to our Spark cluster using a common
username which means that we all share the same history. I am not sure if
this is common , but unfortunately there is nothing I can do about it.

Is there any option to set the location of the history?  I am looking for
something like spark-shell --history=...path or something similar that can
be reused for other sessions.

By default , history seems to be stored in $HOME/.scala_history and I did
not see anything in the documentation about it , but maybe there is an
undocumented way to do it.

Thanks for your help!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming not working

2020-04-14 Thread ZHANG Wei
Here is the assertion error message format:

   s"Failed to get records for $groupId $topic $partition $offset after polling 
for $timeout")

You might have to check the kafka service with the error log:

> 20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
> java.lang.AssertionError: assertion failed: Failed to get records for 
> spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
> 12

Cheers,
-z


From: Debabrata Ghosh 
Sent: Saturday, April 11, 2020 2:25
To: user
Subject: Re: Spark Streaming not working

Any solution please ?

On Fri, Apr 10, 2020 at 11:04 PM Debabrata Ghosh 
mailto:mailford...@gmail.com>> wrote:
Hi,
I have a spark streaming application where Kafka is producing records 
but unfortunately spark streaming isn't able to consume those.

I am hitting the following error:

20/04/10 17:28:04 ERROR Executor: Exception in task 0.5 in stage 0.0 (TID 24)
java.lang.AssertionError: assertion failed: Failed to get records for 
spark-executor-service-spark-ingestion dice-ingestion 11 0 after polling for 
12
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)

Would you please be able to help with a resolution.

Thanks,
Debu

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark interrupts S3 request backoff

2020-04-14 Thread ZHANG Wei
I will make a guess, it's not interruptted, it's killed by the driver or the 
resource manager since the executor fallen into sleep for a long time.

You may have to find the root cause in the driver and failed executor log 
contexts.

--
Cheers,
-z


From: Lian Jiang 
Sent: Monday, April 13, 2020 10:43
To: user
Subject: Spark interrupts S3 request backoff

Hi,

My Spark job failed when reading parquet files from S3 due to 503 slow down. 
According to 
https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html, I 
can use backoff to mitigate this issue. However, spark seems to interrupt the 
backoff sleeping (see "sleep interrupted"). Is there a way (e.g. some settings) 
to make spark not interrupt the backoff? Appreciate any hints.



20/04/12 20:15:37 WARN TaskSetManager: Lost task 3347.0 in stage 155.0 (TID 
128138, ip-100-101-44-35.us-west-2.compute.internal, executor 34): 
org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to 
download file path: 
s3://mybucket/myprefix/part-00178-d0a0d51f-f98e-4b9d-8d00-bb3b9acd9a47-c000.snappy.parquet,
 range: 0-19231, partition values: [empty row], isDataPresent: false
at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:248)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:172)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Slow Down (Service: Amazon S3; Status Code: 503; Error Code: 503 Slow Down; 
Request ID: CECE220993AE7F89; S3 Extended Request ID: 
UlQe4dEuBR1YWJUthSlrbV9phyqxUNHQEw7tsJ5zu+oNIH+nGlGHfAv7EKkQRUVP8tw8x918A4Y=), 
S3 Extended Request ID: 
UlQe4dEuBR1YWJUthSlrbV9phyqxUNHQEw7tsJ5zu+oNIH+nGlGHfAv7EKkQRUVP8tw8x918A4Y=
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at 

Re: [External Sender] Re: Driver pods stuck in running state indefinitely

2020-04-12 Thread Zhang Wei
I would like to suggest to double check the resolving with
logging into the failed node, and try the ping command:

ping spark-1586333186571-driver-svc.fractal-segmentation.svc

Just my 2 cents.

-- 
Cheers,
-z

On Fri, 10 Apr 2020 13:03:46 -0400
"Prudhvi Chennuru (CONT)"  wrote:

> No, there was no internal domain issue. As I mentioned I saw this issue
> only on a few nodes on the cluster.
> 
> On Thu, Apr 9, 2020 at 10:49 PM Wei Zhang  wrote:
> 
> > Is there any internal domain name resolving issues?
> >
> > > Caused by:  java.net.UnknownHostException:
> > spark-1586333186571-driver-svc.fractal-segmentation.svc
> >
> > -z
> > 
> > From: Prudhvi Chennuru (CONT) 
> > Sent: Friday, April 10, 2020 2:44
> > To: user
> > Subject: Driver pods stuck in running state indefinitely
> >
> >
> > Hi,
> >
> >We are running spark batch jobs on K8s.
> >Kubernetes version: 1.11.5 ,
> >spark version: 2.3.2,
> >   docker version: 19.3.8
> >
> >Issue: Few Driver pods are stuck in running state indefinitely with
> > error
> >
> >```
> >The Initial job has not accepted any resources; check your cluster UI
> > to ensure that workers are registered and have sufficient resources.
> >```
> >
> > Below is the log of the errored out executor pods
> >
> >   ```
> >Exception in thread "main"
> > java.lang.reflect.UndeclaredThrowableException
> > at
> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1858)
> > at
> > org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:63)
> > at
> > org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
> > at
> > org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
> > at
> > org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> > Caused by: org.apache.spark.SparkException: Exception thrown in
> > awaitResult:
> > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> > at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
> > at
> > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
> > at
> > org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
> > at
> > org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:63)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840)
> > ... 4 more
> > Caused by: java.io.IOException: Failed to connect to
> > spark-1586333186571-driver-svc.fractal-segmentation.svc:7078
> > at
> > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
> > at
> > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
> > at
> > org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
> > at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
> > at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.net.UnknownHostException:
> > spark-1586333186571-driver-svc.fractal-segmentation.svc
> > at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
> > at java.net.InetAddress.getAllByName(InetAddress.java:1192)
> > at java.net.InetAddress.getAllByName(InetAddress.java:1126)
> > at java.net.InetAddress.getByName(InetAddress.java:1076)
> > at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
> > at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)
> > at
> > io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)
> > at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)
> > at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)
> > at
> > io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)
> > at
> > io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)
> > at
> > io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)
> > at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)
> > at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)
> > at