[Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

2018-01-18 Thread chris-sw
Hi,

I recently did some experiments with stateful structured streaming by using
flatmapgroupswithstate. The streaming application is quit simple: It
receives data from Kafka, feed it to the stateful operator
(flatmapgroupswithstate) and sinks the output to console.
During a test with small datasets (3-5 records per batch) I experienced long
batch runs.

Taking a look at the log I see an explosion of tasks with these log entries:
-
2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO
org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID
287)
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
- Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir =
/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty
blocks out of 1 blocks
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote
fetches in 0 ms
2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
- Committed version 2 for
HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85]
to file
/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta
2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID
287). 2212 bytes result sent to driver
-

A batch run takes approx. 5 seconds and it seems most of the time it is
doing tasks like above.
I created several apps using the non-Spark SQL approach with mapWithState
but never experienced these long batch runs.

Anyone has this experience as well or can help me finding a solution for
these long runs.

Regards,

Chris



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unsubscribe

2018-01-18 Thread Yash Sharma
Please send mail to user-unsubscr...@spark.apache.org to unsubscribe.

Cheers

On Fri., 19 Jan. 2018, 5:11 pm Anu B Nair,  wrote:

>


Re: Unsubscribe

2018-01-18 Thread Yash Sharma
Please send mail to user-unsubscr...@spark.apache.org to unsubscribe.

Cheers

On Fri., 19 Jan. 2018, 5:28 pm Sbf xyz,  wrote:

>


Unsubscribe

2018-01-18 Thread Sbf xyz



Unsubscribe

2018-01-18 Thread Anu B Nair



[Structured Streaming]: Structured Streaming into Redshift sink

2018-01-18 Thread Somasundaram Sekar
Is it possible to write the Dataframe backed by Kafka Streaming source into
AWS Redshift, we have in the past used
https://github.com/databricks/spark-redshift to write into redshift, but I
presume it will not work with *writeStream*. Also writing with JDBC
connector with ForeachWriter is also may not be a good idea given the way
Redshift works.



One possible approach that I have come across from Yelp blog (
https://engineeringblog.yelp.com/2016/10/redshift-connector.html) is to
write the files into S3 and then invoke Redhift COPY(
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) with a *Manifest
file having the S3 Object path*, in case of Structured Streaming, how can I
control the files into which I write to S3 and have a separate trigger to
create a manifest file after writing say 5 files into S3.



Any other possible solution are also appreciated. Thanks in advance.



Regards,

Somasundaram S

-- 
*Disclaimer*: This e-mail is intended to be delivered only to the named 
addressee(s). If this information is received by anyone other than the 
named addressee(s), the recipient(s) should immediately notify 
i...@tigeranalytics.com and promptly delete the transmitted material from 
your computer and server.   In no event shall this material be read, used, 
stored, or retained by anyone other than the named addressee(s) without the 
express written consent of the sender or the named addressee(s). Computer 
viruses can be transmitted viaemail. The recipient should check this email and 
any attachments for viruses. The company accepts no liability for any 
damage caused by any virus transmitted by this email.


Re: "Got wrong record after seeking to offset" issue

2018-01-18 Thread Justin Miller
Yeah I saw that after I sent that e-mail out. Iactually remembered another 
ticket that I had commented on that turned out to be unrelated to the issue I 
was seeing at the time. It may be related to the current issue:

https://issues.apache.org/jira/browse/SPARK-17147 


We are compacting topics, but only offset topics. We just updated our message 
version to 0.10 today as our last non-Spark project was brought up to 0.11 
(Storm based).

Justin

> On Jan 18, 2018, at 1:39 PM, Cody Koeninger  wrote:
> 
> https://kafka.apache.org/documentation/#compaction
> 
> On Thu, Jan 18, 2018 at 1:17 AM, Justin Miller
>  wrote:
>> By compacted do you mean compression? If so then we did recently turn on lz4
>> compression. If there’s another meaning if there’s a command I can run to
>> check compaction I’m happy to give that a shot too.
>> 
>> I’ll try consuming from the failed offset if/when the problem manifests
>> itself again.
>> 
>> Thanks!
>> Justin
>> 
>> 
>> On Wednesday, January 17, 2018, Cody Koeninger  wrote:
>>> 
>>> That means the consumer on the executor tried to seek to the specified
>>> offset, but the message that was returned did not have a matching
>>> offset.  If the executor can't get the messages the driver told it to
>>> get, something's generally wrong.
>>> 
>>> What happens when you try to consume the particular failing offset
>>> from another  (e.g. commandline) consumer?
>>> 
>>> Is the topic in question compacted?
>>> 
>>> 
>>> 
>>> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
>>>  wrote:
 Greetings all,
 
 I’ve recently started hitting on the following error in Spark Streaming
 in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms
 even to five minutes doesn’t seem to be helping. The problem only 
 manifested
 in the last few days, restarting with a new consumer group seems to remedy
 the issue for a few hours (< retention, which is 12 hours).
 
 Error:
 Caused by: java.lang.AssertionError: assertion failed: Got wrong record
 for spark-executor-  76 even after seeking to
 offset 1759148155
at scala.Predef$.assert(Predef.scala:170)
at
 org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
at
 org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at
 org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
 
 I guess my questions are, why is that assertion a job killer vs a
 warning and is there anything I can tweak settings wise that may keep it at
 bay.
 
 I wouldn’t be surprised if this issue were exacerbated by the volume we
 do on Kafka topics (~150k/sec on the persister that’s crashing).
 
 Thank you!
 Justin
 
 
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org
 



Re: "Got wrong record after seeking to offset" issue

2018-01-18 Thread Cody Koeninger
https://kafka.apache.org/documentation/#compaction

On Thu, Jan 18, 2018 at 1:17 AM, Justin Miller
 wrote:
> By compacted do you mean compression? If so then we did recently turn on lz4
> compression. If there’s another meaning if there’s a command I can run to
> check compaction I’m happy to give that a shot too.
>
> I’ll try consuming from the failed offset if/when the problem manifests
> itself again.
>
> Thanks!
> Justin
>
>
> On Wednesday, January 17, 2018, Cody Koeninger  wrote:
>>
>> That means the consumer on the executor tried to seek to the specified
>> offset, but the message that was returned did not have a matching
>> offset.  If the executor can't get the messages the driver told it to
>> get, something's generally wrong.
>>
>> What happens when you try to consume the particular failing offset
>> from another  (e.g. commandline) consumer?
>>
>> Is the topic in question compacted?
>>
>>
>>
>> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
>>  wrote:
>> > Greetings all,
>> >
>> > I’ve recently started hitting on the following error in Spark Streaming
>> > in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms
>> > even to five minutes doesn’t seem to be helping. The problem only 
>> > manifested
>> > in the last few days, restarting with a new consumer group seems to remedy
>> > the issue for a few hours (< retention, which is 12 hours).
>> >
>> > Error:
>> > Caused by: java.lang.AssertionError: assertion failed: Got wrong record
>> > for spark-executor-  76 even after seeking to
>> > offset 1759148155
>> > at scala.Predef$.assert(Predef.scala:170)
>> > at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>> > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>> >
>> > I guess my questions are, why is that assertion a job killer vs a
>> > warning and is there anything I can tweak settings wise that may keep it at
>> > bay.
>> >
>> > I wouldn’t be surprised if this issue were exacerbated by the volume we
>> > do on Kafka topics (~150k/sec on the persister that’s crashing).
>> >
>> > Thank you!
>> > Justin
>> >
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reading Hive RCFiles?

2018-01-18 Thread Michael Segel
No idea on how that last line of garbage got in the message. 


> On Jan 18, 2018, at 9:32 AM, Michael Segel  wrote:
> 
> Hi, 
> 
> I’m trying to find out if there’s a simple way for Spark to be able to read 
> an RCFile. 
> 
> I know I can create a table in Hive, then drop the files in to that directory 
> and use a sql context to read the file from Hive, however I wanted to read 
> the file directly. 
> 
> Not a lot of details to go on… even the Apache site’s links are broken. 
> See :
> https://cwiki.apache.org/confluence/display/Hive/RCFile
> 
> Then try to follow the Javadoc link. 
> 
> 
> Any suggestions? 
> 
> Thx
> 
> -Mike
> 
> 


Reading Hive RCFiles?

2018-01-18 Thread Michael Segel
Hi, 

I’m trying to find out if there’s a simple way for Spark to be able to read an 
RCFile. 

I know I can create a table in Hive, then drop the files in to that directory 
and use a sql context to read the file from Hive, however I wanted to read the 
file directly. 

Not a lot of details to go on… even the Apache site’s links are broken. 
See :
https://cwiki.apache.org/confluence/display/Hive/RCFile

Then try to follow the Javadoc link. 


Any suggestions? 

Thx

-Mike



Re: Spark Stream is corrupted

2018-01-18 Thread KhajaAsmath Mohammed
Any solutions for this problem please .

Sent from my iPhone

> On Jan 17, 2018, at 10:39 PM, KhajaAsmath Mohammed  
> wrote:
> 
> Hi,
> 
> I have created a streaming object from checkpoint but it always through up 
> error as stream corrupted when I restart spark streaming job. any solution 
> for this?
> 
> private def createStreamingContext(
> sparkCheckpointDir: String, sparkSession: SparkSession,
> batchDuration: Int, config: com.typesafe.config.Config) = {
> val topics = config.getString(Constants.Properties.KafkaTopics)
> val topicsSet = topics.split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> 
> config.getString(Constants.Properties.KafkaBrokerList))
> val ssc = new StreamingContext(sparkSession.sparkContext, 
> Seconds(batchDuration))
> val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> val datapointDStream = 
> messages.map(_._2).map(TransformDatapoint.parseDataPointText)
> lazy val sqlCont = sparkSession.sqlContext
> 
> hiveDBInstance = config.getString("hiveDBInstance")
> 
> TransformDatapoint.readDstreamData(sparkSession, sqlCont, 
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, 
> fencedDPTmpTableName, fencedVINDPTmpTableName, hiveDBInstance)
> 
> ssc.checkpoint(sparkCheckpointDir)
> ssc
>   }
> 
> 
> 
> // calling streming context method
> 
>  val streamingContext = 
> StreamingContext.getOrCreate(config.getString(Constants.Properties.CheckPointDir),
>  () => 
> createStreamingContext(config.getString(Constants.Properties.CheckPointDir), 
> sparkSession, config.getInt(Constants.Properties.BatchInterval), config))
> 
> ERROR:
> org.apache.spark.SparkException: Failed to read checkpoint from directory 
> hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPointNTDSC
> 
> java.io.IOException: Stream is corrupted
> 
> 
> Thanks,
> Asmath


Re: StreamingLogisticRegressionWithSGD : Multiclass Classification : Options

2018-01-18 Thread Patrick McCarthy
As a hack, you could perform a number of 1 vs. all classifiers and then
post-hoc select among the highest prediction probability to assign class.

On Thu, Jan 18, 2018 at 12:17 AM, Sundeep Kumar Mehta  wrote:

> Hi,
>
> I was looking for Logistic Regression with Multi Class classifier on
> Streaming data do we have any alternative options or library/github prj.
>
> As StreamingLogisticRegressionWithSGD only supports binary classification
>
> Regards
> Sundeep
>


Re: Writing data in HDFS high available cluster

2018-01-18 Thread Subhash Sriram
Hi Soheil,

We have a high availability cluster as well, but I never have to specify the 
active master when writing, only the cluster name. It works regardless of which 
node is the active master.

Hope that helps.

Thanks,
Subhash 

Sent from my iPhone

> On Jan 18, 2018, at 5:49 AM, Soheil Pourbafrani  wrote:
> 
> I have a HDFS high available cluster with two namenode, one as active 
> namenode and one as standby namenode. When I want to write data to HDFS I use 
> the active namenode address. Now, my question is what happened if during 
> spark writing data active namenode fails. Is there any way to set both active 
> namenode and standby namenode in spark for writing data?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Writing data in HDFS high available cluster

2018-01-18 Thread Soheil Pourbafrani
I have a HDFS high available cluster with two namenode, one as active
namenode and one as standby namenode. When I want to write data to HDFS I
use the active namenode address. Now, my question is what happened if
during spark writing data active namenode fails. Is there any way to set
both active namenode and standby namenode in spark for writing data?


Re: good materiala to learn apache spark

2018-01-18 Thread Marco Mistroni
Jacek lawskowski on this mail list wrote a book which is available
online.
Hth

On Jan 18, 2018 6:16 AM, "Manuel Sopena Ballesteros" <
manuel...@garvan.org.au> wrote:

> Dear Spark community,
>
>
>
> I would like to learn more about apache spark. I have a Horton works HDP
> platform and have ran a few spark jobs in a cluster but now I need to know
> more in depth how spark works.
>
>
>
> My main interest is sys admin and operational point of Spark and it’s
> ecosystem.
>
>
>
> Is there any material?
>
>
>
> Thank you very much
>
>
>
> *Manuel Sopena Ballesteros *| Big data Engineer
> *Garvan Institute of Medical Research *
> The Kinghorn Cancer Centre, 370 Victoria Street, Darlinghurst, NSW 2010
> 
> *T:* + 61 (0)2 9355 5760 <+61%202%209355%205760> | *F:* +61 (0)2 9295 8507
> <+61%202%209295%208507> | *E:* manuel...@garvan.org.au
>
>
> NOTICE
> Please consider the environment before printing this email. This message
> and any attachments are intended for the addressee named and may contain
> legally privileged/confidential/copyright information. If you are not the
> intended recipient, you should not read, use, disclose, copy or distribute
> this communication. If you have received this message in error please
> notify us at once by return email and then delete both messages. We accept
> no liability for the distribution of viruses or similar in electronic
> communications. This notice should not be removed.
>


Does Spark and Hive use Same SQL parser : ANTLR

2018-01-18 Thread Pralabh Kumar
Hi


Does hive and spark uses same SQL parser provided by ANTLR . Did they
generate the same logical plan .

Please help on the same.


Regards
Pralabh Kumar


Re: Spark application on yarn cluster clarification

2018-01-18 Thread Fawze Abujaber
Hi Soheil,

Resource manager and NodeManager are enough, of your you need the roles of
DataNode and NameNode to be able accessing the Data.

On Thu, 18 Jan 2018 at 10:12 Soheil Pourbafrani 
wrote:

> I am setting up a Yarn cluster to run Spark applications on that, but I'm
> confused a bit!
>
> Consider I have a 4-node yarn cluster including one resource manager and 3
> node manager and spark are installed in all 4 nodes.
>
> Now my question is when I want to submit spark application to yarn
> cluster, is it needed spark daemons (both master and slaves) to be running,
> or not, running just resource and node managers suffice?
>
> Thanks
>


Writing to Redshift from Kafka Streaming source

2018-01-18 Thread Somasundaram Sekar
Hi,



Is it possible to write the Dataframe backed by Kafka Streaming source into
AWS Redshift, we have in the past used
https://github.com/databricks/spark-redshift to write into redshift, but I
presume it will not work with DataFrame##writeStream(). Also writing with
JDBC connector with ForeachWriter is also may not be a good idea given the
way Redshift works.



One possible approach that I have come across from Yelp blog (
https://engineeringblog.yelp.com/2016/10/redshift-connector.html) is to
write the files into S3 and then invoke Redhift COPY(
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) with a Manifest
file having the S3 Object path, in case of Structured Streaming, how can I
control the files into which I write to S3 and have a separate trigger to
create a manifest file after writing say 5 files into S3.



Any other possible solution are also appreciated. Thanks in advance.



Regards,

Somasundaram S

-- 
*Disclaimer*: This e-mail is intended to be delivered only to the named 
addressee(s). If this information is received by anyone other than the 
named addressee(s), the recipient(s) should immediately notify 
i...@tigeranalytics.com and promptly delete the transmitted material from 
your computer and server.   In no event shall this material be read, used, 
stored, or retained by anyone other than the named addressee(s) without the 
express written consent of the sender or the named addressee(s). Computer 
viruses can be transmitted viaemail. The recipient should check this email and 
any attachments for viruses. The company accepts no liability for any 
damage caused by any virus transmitted by this email.


Spark application on yarn cluster clarification

2018-01-18 Thread Soheil Pourbafrani
I am setting up a Yarn cluster to run Spark applications on that, but I'm
confused a bit!

Consider I have a 4-node yarn cluster including one resource manager and 3
node manager and spark are installed in all 4 nodes.

Now my question is when I want to submit spark application to yarn cluster,
is it needed spark daemons (both master and slaves) to be running, or not,
running just resource and node managers suffice?

Thanks


spark linear regression model fit result is different from statsmodels linear model.

2018-01-18 Thread TonyHu
I want to do a VIF check on spark, so i have to get R^2(coefficient of
determination) from linear regression model. But the result is much
different from the R^2 using statsmodels linear model. I don't know why.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org