How to set Description in UI SQL tab

2020-06-04 Thread gpatcham
Is there a way can we set description to display in UI SQL TAB ?. Like we can
set sc.setJobDescription for Jobs and stages 

Thanks
Giri



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark] Tagging descriptions

2020-06-04 Thread Rishi Shah
Thanks everyone. While working on Tagging I stumbled upon another setback..
There are about 5000 regex I am dealing with, out of with couple of
hundreds have variable length lookbehind (originally these worked in a
JVM). In order to use this with Python/Pyspark udf - we need to either
modify these regex rules so that it can work with Python or move this to
scala/java based implementation..

Does anyone have any experience with variable length lookbehind
(quantifiers/alternations of variable length) in python/pyspark? Any
suggestions are much appreciated!

Thanks,
-Rishi

On Thu, May 14, 2020 at 2:57 PM Netanel Malka  wrote:

> For elasticsearch you can use the elastic official connector.
> https://www.elastic.co/what-is/elasticsearch-hadoop
>
> Elastic spark connector docs:
> https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
>
>
>
> On Thu, May 14, 2020, 21:14 Amol Umbarkar  wrote:
>
>> Check out sparkNLP for tokenization. I am not sure about solar or elastic
>> search though
>>
>> On Thu, May 14, 2020 at 9:02 PM Rishi Shah 
>> wrote:
>>
>>> This is great, thanks you Zhang & Amol !!
>>>
>>> Yes we can have multiple tags per row and multiple regex applied to
>>> single row as well. Would you have any example of working with spark &
>>> search engines like Solar, ElasticSearch? Does Spark ML provide
>>> tokenization support as expected (I am yet to try SparkML, still a
>>> beginner)?
>>>
>>> Any other reference material you found useful while working on similar
>>> problem? appreciate all the help!
>>>
>>> Thanks,
>>> -Rishi
>>>
>>>
>>> On Thu, May 14, 2020 at 6:11 AM Amol Umbarkar 
>>> wrote:
>>>
 Rishi,
 Just adding to zhang's questions.

 Are you expecting multiple tags per row?
 Do you check multiple regex for a single tag?

 Let's say you had only one tag then theoretically you should be do this
 -

 1 Remove stop words or any irrelevant stuff
 2 split text into equal sized chunk column (eg - if max length is
 1000chars, split into 20 columns of 50 chars)
 3 distribute work for each column that would result in binary
 (true/false) for a single tag
 4 merge the 20 resulting columns
 5 repeat for other tags or do them in parallel 3 and 4 for them

 Note on 3: If you expect single tag per row, then you can repeat 3
 column by column and skip rows that have got tags in prior step.

 Secondly, if you expect similarity in text (of some kind) then you
 could jus work on unique text values (might require shuffle, hence
 expensive) and then join the end result back to the original data.  You
 could use hash of some kind to join back. Though I would go for this
 approach only if the chances of similarity in text are very high (it could
 be in your case for being transactional data).

 Not the full answer to your question but hope this helps you brainstorm
 more.

 Thanks,
 Amol





 On Wed, May 13, 2020 at 10:17 AM Rishi Shah 
 wrote:

> Thanks ZHANG! Please find details below:
>
> # of rows: ~25B, row size would be somewhere around ~3-5MB (it's a
> parquet formatted data so, need to worry about only the columns to be
> tagged)
>
> avg length of the text to be parsed : ~300
>
> Unfortunately don't have sample data or regex which I can share
> freely. However about data being parsed - assume these are purchases made
> online and we are trying to parse the transaction details. Like purchases
> made on amazon can be tagged to amazon as well as other vendors etc.
>
> Appreciate your response!
>
>
>
> On Tue, May 12, 2020 at 6:23 AM ZHANG Wei  wrote:
>
>> May I get some requirement details?
>>
>> Such as:
>> 1. The row count and one row data size
>> 2. The avg length of text to be parsed by RegEx
>> 3. The sample format of text to be parsed
>> 4. The sample of current RegEx
>>
>> --
>> Cheers,
>> -z
>>
>> On Mon, 11 May 2020 18:40:49 -0400
>> Rishi Shah  wrote:
>>
>> > Hi All,
>> >
>> > I have a tagging problem at hand where we currently use regular
>> expressions
>> > to tag records. Is there a recommended way to distribute & tag?
>> Data is
>> > about 10TB large.
>> >
>> > --
>> > Regards,
>> >
>> > Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>

>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>

-- 
Regards,

Rishi Shah


Re: Using Spark Accumulators with Structured Streaming

2020-06-04 Thread ZHANG Wei
The following Java codes can work in my cluster environment:
```
.mapGroupsWithState((MapGroupsWithStateFunction) (key, values, state) -> {
myAcc.add(1);
<...>
state.update(newState);
return new LeadingCharCount(key, newState);
},
Encoders.LONG(),
Encoders.bean(LeadingCharCount.class),
GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
.mapGroupsWithState(
new StateUpdateTask(myAcc),
Encoders.LONG(),
Encoders.bean(LeadingCharCount.class),
GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
implements MapGroupsWithStateFunction {
private LongAccumulator myAccInTask;

public StateUpdateTask(LongAccumulator acc) {
this.myAccInTask = acc;
}

@Override
public LeadingCharCount call(String key, Iterator values, 
GroupState state) throws Exception {
myAccInTask.add(1);
<...>
state.update(newState);
return new LeadingCharCount(key, newState);
}
}
```

-- 
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei  wrote:

> Yes, verified on the cluster with 5 executors.
> 
> -- 
> Cheers,
> -z
> 
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something  wrote:
> 
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> > 
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei  wrote:
> > 
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > > spark.streams.addListener(new StreamingQueryListener {
> > >   override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > > println(event.progress.id + " is on progress")
> > > println(s"My accu is ${myAcc.value} on query progress")
> > >   }
> > > ...
> > > })
> > >
> > > def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >   myAcc.add(1)
> > >   println(s">>> key: $key => state: ${state}")
> > > ...
> > > }
> > >
> > > val wordCounts = words
> > >   .groupByKey(v => ...)
> > >   .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > > val query = wordCounts.writeStream
> > >   .outputMode(OutputMode.Update)
> > > ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > > .mapGroupsWithState(
> > > > new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > > Encoders.bean(ModelStateInfo.class),
> > > > Encoders.bean(ModelUpdate.class),
> > > > GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V  wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > > @Override
> > > > public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > > logger.info("Query started: " + queryStarted.id());
> > > > }
> > > > @Override
> > > > public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > > logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > > }
> > > > @Override
> > > > public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > > long eventsReceived = 0;
> > > > long eventsExpired = 0;
> > > > long eventSentSuccess = 0;
> > > > try {
> > > > eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > > eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > > eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > > } catch (MissingKeyException e) {
> > > > logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > > }
> > > > logger.info("Events Received:{}", eventsReceived);
> > > > logger.info("Events