Re: Using Spark Accumulators with Structured Streaming

2020-06-07 Thread Something Something
Great. I guess the trick is to use a separate class such as
'StateUpdateTask'. I will try that. My challenge is to convert this into
Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei  wrote:

> The following Java codes can work in my cluster environment:
> ```
> .mapGroupsWithState((MapGroupsWithStateFunction LeadingCharCount>) (key, values, state) -> {
> myAcc.add(1);
> <...>
> state.update(newState);
> return new LeadingCharCount(key, newState);
> },
> Encoders.LONG(),
> Encoders.bean(LeadingCharCount.class),
> GroupStateTimeout.ProcessingTimeTimeout())
> ```
>
> Also works fine with my `StateUpdateTask`:
> ```
> .mapGroupsWithState(
> new StateUpdateTask(myAcc),
> Encoders.LONG(),
> Encoders.bean(LeadingCharCount.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> public class StateUpdateTask
> implements MapGroupsWithStateFunction LeadingCharCount> {
> private LongAccumulator myAccInTask;
>
> public StateUpdateTask(LongAccumulator acc) {
> this.myAccInTask = acc;
> }
>
> @Override
> public LeadingCharCount call(String key, Iterator values,
> GroupState state) throws Exception {
> myAccInTask.add(1);
> <...>
> state.update(newState);
> return new LeadingCharCount(key, newState);
> }
> }
> ```
>
> --
> Cheers,
> -z
>
> On Tue, 2 Jun 2020 10:28:36 +0800
> ZHANG Wei  wrote:
>
> > Yes, verified on the cluster with 5 executors.
> >
> > --
> > Cheers,
> > -z
> >
> > On Fri, 29 May 2020 11:16:12 -0700
> > Something Something  wrote:
> >
> > > Did you try this on the Cluster? Note: This works just fine under
> 'Local'
> > > mode.
> > >
> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei  wrote:
> > >
> > > > I can't reproduce the issue with my simple code:
> > > > ```scala
> > > > spark.streams.addListener(new StreamingQueryListener {
> > > >   override def onQueryProgress(event:
> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > > > println(event.progress.id + " is on progress")
> > > > println(s"My accu is ${myAcc.value} on query progress")
> > > >   }
> > > > ...
> > > > })
> > > >
> > > > def mappingFunc(key: Long, values: Iterator[String], state:
> > > > GroupState[Long]): ... = {
> > > >   myAcc.add(1)
> > > >   println(s">>> key: $key => state: ${state}")
> > > > ...
> > > > }
> > > >
> > > > val wordCounts = words
> > > >   .groupByKey(v => ...)
> > > >   .mapGroupsWithState(timeoutConf =
> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > > >
> > > > val query = wordCounts.writeStream
> > > >   .outputMode(OutputMode.Update)
> > > > ...
> > > > ```
> > > >
> > > > I'm wondering if there were any errors can be found from driver
> logs? The
> > > > micro-batch
> > > > exceptions won't terminate the streaming job running.
> > > >
> > > > For the following code, we have to make sure that `StateUpdateTask`
> is
> > > > started:
> > > > > .mapGroupsWithState(
> > > > > new
> > > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > appConfig, accumulators),
> > > > > Encoders.bean(ModelStateInfo.class),
> > > > > Encoders.bean(ModelUpdate.class),
> > > > > GroupStateTimeout.ProcessingTimeTimeout());
> > > >
> > > > --
> > > > Cheers,
> > > > -z
> > > >
> > > > On Thu, 28 May 2020 19:59:31 +0530
> > > > Srinivas V  wrote:
> > > >
> > > > > Giving the code below:
> > > > > //accumulators is a class level variable in driver.
> > > > >
> > > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > > > @Override
> > > > > public void onQueryStarted(QueryStartedEvent
> queryStarted) {
> > > > > logger.info("Query started: " +
> queryStarted.id());
> > > > > }
> > > > > @Override
> > > > > public void onQueryTerminated(QueryTerminatedEvent
> > > > > queryTerminated) {
> > > > > logger.info("Query terminated: " +
> > > > queryTerminated.id());
> > > > > }
> > > > > @Override
> > > > > public void onQueryProgress(QueryProgressEvent
> > > > queryProgress) {
> > > > >
> > > > >
> accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > > > long eventsReceived = 0;
> > > > > long eventsExpired = 0;
> > > > > long eventSentSuccess = 0;
> > > > > try {
> > > > > eventsReceived =
> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > > 

Re: unsubscribe

2020-06-07 Thread Wesley




please send an empty email to: user-unsubscr...@spark.apache.org for 
unsubscribing.


thanks.



unsubscribe


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2020-06-07 Thread Marian Zsemlye
unsubscribe please


Mgr. Marian Zsemlye
Lead Software Developer

PROFECT Slovakia s.r.o.
Prievozská 4D
821 09 Bratislava

Mobile: +421 903 401 153
E-Mail: marian.zsem...@profect.sk

[A close up of a sign  Description automatically 
generated]
[LI-In-Bug]
[cid:image003.jpg@01D63D66.1FE97380]




unsubscribe

2020-06-07 Thread Arkadiy Ver
unsubscribe


Re: NoClassDefFoundError: scala/Product$class

2020-06-07 Thread charles_cai
The org.bdgenomics.adam is one of the Components of the GATK, and I just
download the release version from its github website . However, when I build
a new  docker image with spark2.4.5 and scala 2.12.4,It works well and that
makes me confused.


root@master2:~# pyspark 
Python 2.7.17 (default, Apr 15 2020, 17:20:14) 
[GCC 7.5.0] on linux2
Type "help", "copyright", "credits" or "license" for more information.
20/06/08 01:44:16 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.5
  /_/

Using Python version 2.7.17 (default, Apr 15 2020 17:20:14)
SparkSession available as 'spark'.


root@master2:~# scala -version
Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and
Lightbend, Inc.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark :- Update record in partition.

2020-06-07 Thread ayan guha
Hi

Please look at delta.io which is a companion open source project. It
addresses the exact use case you are after.

On Mon, Jun 8, 2020 at 2:35 AM Sunil Kalra  wrote:

> Hi All,
>
> If i have to update a record in partition using spark, do i have to read
> the whole partition and update the row and overwrite the partition?
>
> Is there a way to only update 1 row like DBMS. Otherwise 1 row update
> takes a long time to rewrite the whole partition ?
>
> Thanks
> Sunil
>
>
>
>
>

-- 
Best Regards,
Ayan Guha


Re: Structured Streaming using File Source - How to handle live files

2020-06-07 Thread Jungtaek Lim
Hi Nick,

I guess that's by design - Spark assumes the input file will not be
modified once it is placed on the input path. This makes Spark easy to
track the list of processed files vs unprocessed files. Assume input files
can be modified, then Spark will have to enumerate all of files and track
how many lines/bytes it reads "per file", even the bad case it may read the
incomplete line (if the writer doesn't guarantee that) and crash or bring
incorrect results.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Mon, Jun 8, 2020 at 2:43 AM ArtemisDev  wrote:

> We were trying to use structured streaming from file source, but had
> problems getting the files read by Spark properly.  We have another
> process generating the data files in the Spark data source directory on
> a continuous basis.  What we have observed was that the moment a data
> file is created before the data producing process finished, it was read
> by Spark immediately without reaching the EOF.  Then Spark will never
> revisit the file.  So we only ended up with empty data content.  The
> only way to make it to work is to produce the data files in a separate
> directory (e.g. /tmp) and move them to the Spark's file source dir after
> the data generation completes.
>
> My questions:  Is this a behavior by design or is there any way to
> control the Spark streaming process not to import a file while it is
> still being used by another process?  In other words, do we have to use
> the tmp dir to move data files around or can the data producing process
> and Spark share the same directory?
>
> Thanks!
>
> -- Nick
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Structured Streaming using File Source - How to handle live files

2020-06-07 Thread ArtemisDev
We were trying to use structured streaming from file source, but had 
problems getting the files read by Spark properly.  We have another 
process generating the data files in the Spark data source directory on 
a continuous basis.  What we have observed was that the moment a data 
file is created before the data producing process finished, it was read 
by Spark immediately without reaching the EOF.  Then Spark will never 
revisit the file.  So we only ended up with empty data content.  The 
only way to make it to work is to produce the data files in a separate 
directory (e.g. /tmp) and move them to the Spark's file source dir after 
the data generation completes.


My questions:  Is this a behavior by design or is there any way to 
control the Spark streaming process not to import a file while it is 
still being used by another process?  In other words, do we have to use 
the tmp dir to move data files around or can the data producing process 
and Spark share the same directory?


Thanks!

-- Nick


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark :- Update record in partition.

2020-06-07 Thread Sunil Kalra
Hi All,

If i have to update a record in partition using spark, do i have to read
the whole partition and update the row and overwrite the partition?

Is there a way to only update 1 row like DBMS. Otherwise 1 row update takes
a long time to rewrite the whole partition ?

Thanks
Sunil