How to set Description in UI SQL tab
Is there a way can we set description to display in UI SQL TAB ?. Like we can set sc.setJobDescription for Jobs and stages Thanks Giri -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [PySpark] Tagging descriptions
Thanks everyone. While working on Tagging I stumbled upon another setback.. There are about 5000 regex I am dealing with, out of with couple of hundreds have variable length lookbehind (originally these worked in a JVM). In order to use this with Python/Pyspark udf - we need to either modify these regex rules so that it can work with Python or move this to scala/java based implementation.. Does anyone have any experience with variable length lookbehind (quantifiers/alternations of variable length) in python/pyspark? Any suggestions are much appreciated! Thanks, -Rishi On Thu, May 14, 2020 at 2:57 PM Netanel Malka wrote: > For elasticsearch you can use the elastic official connector. > https://www.elastic.co/what-is/elasticsearch-hadoop > > Elastic spark connector docs: > https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html > > > > On Thu, May 14, 2020, 21:14 Amol Umbarkar wrote: > >> Check out sparkNLP for tokenization. I am not sure about solar or elastic >> search though >> >> On Thu, May 14, 2020 at 9:02 PM Rishi Shah >> wrote: >> >>> This is great, thanks you Zhang & Amol !! >>> >>> Yes we can have multiple tags per row and multiple regex applied to >>> single row as well. Would you have any example of working with spark & >>> search engines like Solar, ElasticSearch? Does Spark ML provide >>> tokenization support as expected (I am yet to try SparkML, still a >>> beginner)? >>> >>> Any other reference material you found useful while working on similar >>> problem? appreciate all the help! >>> >>> Thanks, >>> -Rishi >>> >>> >>> On Thu, May 14, 2020 at 6:11 AM Amol Umbarkar >>> wrote: >>> Rishi, Just adding to zhang's questions. Are you expecting multiple tags per row? Do you check multiple regex for a single tag? Let's say you had only one tag then theoretically you should be do this - 1 Remove stop words or any irrelevant stuff 2 split text into equal sized chunk column (eg - if max length is 1000chars, split into 20 columns of 50 chars) 3 distribute work for each column that would result in binary (true/false) for a single tag 4 merge the 20 resulting columns 5 repeat for other tags or do them in parallel 3 and 4 for them Note on 3: If you expect single tag per row, then you can repeat 3 column by column and skip rows that have got tags in prior step. Secondly, if you expect similarity in text (of some kind) then you could jus work on unique text values (might require shuffle, hence expensive) and then join the end result back to the original data. You could use hash of some kind to join back. Though I would go for this approach only if the chances of similarity in text are very high (it could be in your case for being transactional data). Not the full answer to your question but hope this helps you brainstorm more. Thanks, Amol On Wed, May 13, 2020 at 10:17 AM Rishi Shah wrote: > Thanks ZHANG! Please find details below: > > # of rows: ~25B, row size would be somewhere around ~3-5MB (it's a > parquet formatted data so, need to worry about only the columns to be > tagged) > > avg length of the text to be parsed : ~300 > > Unfortunately don't have sample data or regex which I can share > freely. However about data being parsed - assume these are purchases made > online and we are trying to parse the transaction details. Like purchases > made on amazon can be tagged to amazon as well as other vendors etc. > > Appreciate your response! > > > > On Tue, May 12, 2020 at 6:23 AM ZHANG Wei wrote: > >> May I get some requirement details? >> >> Such as: >> 1. The row count and one row data size >> 2. The avg length of text to be parsed by RegEx >> 3. The sample format of text to be parsed >> 4. The sample of current RegEx >> >> -- >> Cheers, >> -z >> >> On Mon, 11 May 2020 18:40:49 -0400 >> Rishi Shah wrote: >> >> > Hi All, >> > >> > I have a tagging problem at hand where we currently use regular >> expressions >> > to tag records. Is there a recommended way to distribute & tag? >> Data is >> > about 10TB large. >> > >> > -- >> > Regards, >> > >> > Rishi Shah >> > > > -- > Regards, > > Rishi Shah > >>> >>> -- >>> Regards, >>> >>> Rishi Shah >>> >> -- Regards, Rishi Shah
Re: Using Spark Accumulators with Structured Streaming
The following Java codes can work in my cluster environment: ``` .mapGroupsWithState((MapGroupsWithStateFunction) (key, values, state) -> { myAcc.add(1); <...> state.update(newState); return new LeadingCharCount(key, newState); }, Encoders.LONG(), Encoders.bean(LeadingCharCount.class), GroupStateTimeout.ProcessingTimeTimeout()) ``` Also works fine with my `StateUpdateTask`: ``` .mapGroupsWithState( new StateUpdateTask(myAcc), Encoders.LONG(), Encoders.bean(LeadingCharCount.class), GroupStateTimeout.ProcessingTimeTimeout()); public class StateUpdateTask implements MapGroupsWithStateFunction { private LongAccumulator myAccInTask; public StateUpdateTask(LongAccumulator acc) { this.myAccInTask = acc; } @Override public LeadingCharCount call(String key, Iterator values, GroupState state) throws Exception { myAccInTask.add(1); <...> state.update(newState); return new LeadingCharCount(key, newState); } } ``` -- Cheers, -z On Tue, 2 Jun 2020 10:28:36 +0800 ZHANG Wei wrote: > Yes, verified on the cluster with 5 executors. > > -- > Cheers, > -z > > On Fri, 29 May 2020 11:16:12 -0700 > Something Something wrote: > > > Did you try this on the Cluster? Note: This works just fine under 'Local' > > mode. > > > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei wrote: > > > > > I can't reproduce the issue with my simple code: > > > ```scala > > > spark.streams.addListener(new StreamingQueryListener { > > > override def onQueryProgress(event: > > > StreamingQueryListener.QueryProgressEvent): Unit = { > > > println(event.progress.id + " is on progress") > > > println(s"My accu is ${myAcc.value} on query progress") > > > } > > > ... > > > }) > > > > > > def mappingFunc(key: Long, values: Iterator[String], state: > > > GroupState[Long]): ... = { > > > myAcc.add(1) > > > println(s">>> key: $key => state: ${state}") > > > ... > > > } > > > > > > val wordCounts = words > > > .groupByKey(v => ...) > > > .mapGroupsWithState(timeoutConf = > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc) > > > > > > val query = wordCounts.writeStream > > > .outputMode(OutputMode.Update) > > > ... > > > ``` > > > > > > I'm wondering if there were any errors can be found from driver logs? The > > > micro-batch > > > exceptions won't terminate the streaming job running. > > > > > > For the following code, we have to make sure that `StateUpdateTask` is > > > started: > > > > .mapGroupsWithState( > > > > new > > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT), > > > > appConfig, accumulators), > > > > Encoders.bean(ModelStateInfo.class), > > > > Encoders.bean(ModelUpdate.class), > > > > GroupStateTimeout.ProcessingTimeTimeout()); > > > > > > -- > > > Cheers, > > > -z > > > > > > On Thu, 28 May 2020 19:59:31 +0530 > > > Srinivas V wrote: > > > > > > > Giving the code below: > > > > //accumulators is a class level variable in driver. > > > > > > > > sparkSession.streams().addListener(new StreamingQueryListener() { > > > > @Override > > > > public void onQueryStarted(QueryStartedEvent queryStarted) { > > > > logger.info("Query started: " + queryStarted.id()); > > > > } > > > > @Override > > > > public void onQueryTerminated(QueryTerminatedEvent > > > > queryTerminated) { > > > > logger.info("Query terminated: " + > > > queryTerminated.id()); > > > > } > > > > @Override > > > > public void onQueryProgress(QueryProgressEvent > > > queryProgress) { > > > > > > > > accumulators.eventsReceived(queryProgress.progress().numInputRows()); > > > > long eventsReceived = 0; > > > > long eventsExpired = 0; > > > > long eventSentSuccess = 0; > > > > try { > > > > eventsReceived = > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED); > > > > eventsExpired = > > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED); > > > > eventSentSuccess = > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT); > > > > } catch (MissingKeyException e) { > > > > logger.error("Accumulator key not found due to > > > > Exception {}", e.getMessage()); > > > > } > > > > logger.info("Events Received:{}", eventsReceived); > > > > logger.info("Events