spark-shell - modes

2017-08-06 Thread karan alang
Hello all - i'd a basic question on the modes in which spark-shell can be
run ..

when i run the following command,
does Spark run in local mode i.e. outside of YARN & using the local cores ?
(since '--master' option is missing)

./bin/spark-shell --driver-memory 512m --executor-memory 512m

Similarly, when i run the following -

1) ./bin/spark-shell --master yarn-client --driver-memory 512m
--executor-memory 512m

   - Spark is run in Client mode & resources managed by YARN.

2) ./bin/spark-shell --master yarn-cluster --driver-memory 512m
--executor-memory 512m

- Spark is run in Cluster mode & resources managed by YARN.


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread Cody Koeninger
If your complaint is about offsets being committed that you didn't
expect... auto commit being false on executors shouldn't have anything
to do with that.  Executors shouldn't be auto-committing, that's why
it's being overridden.

What you've said and the code you posted isn't really enough to
explain what your issue is, e.g.

is this line
// save the rdd to Cassandra database
a blocking call

are you sure that the rdd foreach isn't being retried and succeeding
the second time around, etc

On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
 wrote:
> Hello All,
> I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>
> I am setting enable.auto.commit to false, and manually want to commit the
> offsets after my output operation is successful. So when a exception is
> raised during during the processing I do not want the offsets to be
> committed. But looks like the offsets are automatically committed even when
> the exception is raised and thereby I am losing data.
> In my logs I see,  WARN  overriding enable.auto.commit to false for
> executor.  But I don't want it to override. Please help.
>
> My code looks like..
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "group.id" -> "Group1",
>   "auto.offset.reset" -> offsetresetparameter,
>   "enable.auto.commit" -> (false: java.lang.Boolean)
> )
>
> val myTopics = Array("topic1")
> val stream1 = KafkaUtils.createDirectStream[String, String](
>   ssc,
>   PreferConsistent,
>   Subscribe[String, String](myTopics, kafkaParams)
> )
>
> stream1.foreachRDD { (rdd, time) =>
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> try {
> //save the rdd to Cassandra database
>
>   stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> } catch {
>   case ex: Exception => {
> println(ex.toString + "!! Bad Data, Unable to persist into
> table !" + errorOffsetRangesToString(offsetRanges))
>   }
> }
> }
>
> ssc.start()
> ssc.awaitTermination()

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark-shell - modes

2017-08-06 Thread karan alang
update - seems 'spark-shell' does not support mode -> yarn-cluster (i guess
since it is an interactive shell)

The only modes supported include -> yarn-client & local

Pls let me know if my understanding is incorrect.
Thanks!


On Sun, Aug 6, 2017 at 10:07 AM, karan alang  wrote:

> Hello all - i'd a basic question on the modes in which spark-shell can be
> run ..
>
> when i run the following command,
> does Spark run in local mode i.e. outside of YARN & using the local cores ?
> (since '--master' option is missing)
>
> ./bin/spark-shell --driver-memory 512m --executor-memory 512m
>
> Similarly, when i run the following -
>
> 1) ./bin/spark-shell --master yarn-client --driver-memory 512m
> --executor-memory 512m
>
>- Spark is run in Client mode & resources managed by YARN.
>
> 2) ./bin/spark-shell --master yarn-cluster --driver-memory 512m
> --executor-memory 512m
>
> - Spark is run in Cluster mode & resources managed by YARN.
>
>


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
Thanks Cody for your response.

All I want to do is, commit the offsets only if I am successfully able to
write to cassandra database.

The line //save the rdd to Cassandra database is
rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")

What do you mean by Executors shouldn't be auto-committing, that's why it's
being overridden. It is the executors that do the mapping and saving to
cassandra. The status of success or failure of this operation is known only
on the executor and thats where I want to commit the kafka offsets. If this
is not what I sould be doing, then  what is the right way?

On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger  wrote:

> If your complaint is about offsets being committed that you didn't
> expect... auto commit being false on executors shouldn't have anything
> to do with that.  Executors shouldn't be auto-committing, that's why
> it's being overridden.
>
> What you've said and the code you posted isn't really enough to
> explain what your issue is, e.g.
>
> is this line
> // save the rdd to Cassandra database
> a blocking call
>
> are you sure that the rdd foreach isn't being retried and succeeding
> the second time around, etc
>
> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>  wrote:
> > Hello All,
> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >
> > I am setting enable.auto.commit to false, and manually want to commit the
> > offsets after my output operation is successful. So when a exception is
> > raised during during the processing I do not want the offsets to be
> > committed. But looks like the offsets are automatically committed even
> when
> > the exception is raised and thereby I am losing data.
> > In my logs I see,  WARN  overriding enable.auto.commit to false for
> > executor.  But I don't want it to override. Please help.
> >
> > My code looks like..
> >
> > val kafkaParams = Map[String, Object](
> >   "bootstrap.servers" -> brokers,
> >   "key.deserializer" -> classOf[StringDeserializer],
> >   "value.deserializer" -> classOf[StringDeserializer],
> >   "group.id" -> "Group1",
> >   "auto.offset.reset" -> offsetresetparameter,
> >   "enable.auto.commit" -> (false: java.lang.Boolean)
> > )
> >
> > val myTopics = Array("topic1")
> > val stream1 = KafkaUtils.createDirectStream[String, String](
> >   ssc,
> >   PreferConsistent,
> >   Subscribe[String, String](myTopics, kafkaParams)
> > )
> >
> > stream1.foreachRDD { (rdd, time) =>
> > val offsetRanges = rdd.asInstanceOf[
> HasOffsetRanges].offsetRanges
> > try {
> > //save the rdd to Cassandra database
> >
> >   stream1.asInstanceOf[CanCommitOffsets].commitAsync(
> offsetRanges)
> > } catch {
> >   case ex: Exception => {
> > println(ex.toString + "!! Bad Data, Unable to persist
> into
> > table !" + errorOffsetRangesToString(offsetRanges))
> >   }
> > }
> > }
> >
> > ssc.start()
> > ssc.awaitTermination()
>


Re: SPARK Issue in Standalone cluster

2017-08-06 Thread Marco Mistroni
Sengupta
 further to this, if you try the following notebook in databricks cloud, it
will read a .csv file , write to a parquet file and read it again (just to
count the number of rows stored)
Please note that the path to the csv file might differ for you.
So, what you will need todo is
1 - create an account to community.cloud.databricks.com
2 - upload the .csv file onto the Data of your databricks private cluster
3  - run the script. that will store the data on the distrubuted filesystem
of the databricks cloudn (dbfs)

It's worth investing in this free databricks cloud as it can create a
cluster for you with minimal effort, and it's  a very easy way to test your
spark scripts on a real cluster

hope this helps
kr

##
from pyspark.sql import SQLContext

from random import randint
from time import sleep
from pyspark.sql.session import SparkSession
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
logger.addHandler(ch)


import sys

def read_parquet_file(parquetFileName):
  logger.info('Reading now the parquet files we just created...:%s',
parquetFileName)
  parquet_data = sqlContext.read.parquet(parquetFileName)
  logger.info('Parquet file has %s', parquet_data.count())

def dataprocessing(filePath, count, sqlContext):
logger.info( 'Iter count is:%s' , count)
if count == 0:
print 'exiting'
else:
df_traffic_tmp =
sqlContext.read.format("csv").option("header",'true').load(filePath)
logger.info( '#DataSet has:%s' ,
df_traffic_tmp.count())
logger.info('WRting to a parquet file')
parquetFileName = "dbfs:/myParquetDf2.parquet"
df_traffic_tmp.write.parquet(parquetFileName)
sleepInterval = randint(10,100)
logger.info( '#Sleeping for %s' ,
sleepInterval)
sleep(sleepInterval)
read_parquet_file(parquetFileName)
dataprocessing(filePath, count-1, sqlContext)

filename =
'/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This path
might differ for you
iterations = 1
logger.info('--')
logger.info('Filename:%s', filename)
logger.info('Iterations:%s', iterations )
logger.info('--')

logger.info ('Initializing sqlContext')
logger.info( 'Starting spark..Loading from%s for %s
iterations' , filename, iterations)
logger.info(  'Starting up')
sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
logger.info ('Initializing sqlContext')
sqlContext = SQLContext(sc)
dataprocessing(filename, iterations, sqlContext)
logger.info('Out of here..')
##


On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni  wrote:

> Uh believe me there are lots of ppl on this list who will send u code
> snippets if u ask... 
>
> Yes that is what Steve pointed out, suggesting also that for that simple
> exercise you should perform all operations on a spark standalone instead
> (or alt. Use an nfs on the cluster)
> I'd agree with his suggestion
> I suggest u another alternative:
> https://community.cloud.databricks.com/
>
> That's a ready made cluster and you can run your spark app as well store
> data on the cluster (well I haven't tried myself but I assume it's
> possible).   Try that out... I will try ur script there as I have an
> account there (though I guess I'll get there before me.)
>
> Try that out and let me know if u get stuck
> Kr
>
> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" 
> wrote:
>
>> Hi Marco,
>>
>> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
>> someone actually executing code and providing response. It feel wonderful
>> that at least someone considered to respond back by executing code and just
>> did not filter out each and every technical details to brood only on my
>> superb social skills, while claiming the reason for ignoring technical
>> details is that it elementary. I think that Steve also is the first person
>> who could answer the WHY of an elementary question instead of saying that
>> is how it is and pointed out to the correct documentation.
>>
>> That code works fantastically. But the problem which I have tried to find
>> out is while writing out the data and not reading it.
>>
>>
>> So if you see try to read the data from the same folder which has the
>> same file across all the nodes then it will work fine. In fact that is what
>> should work.
>>
>> What does not work is that if you try to write back the file and then
>> read it once again from the location you have written that is when the
>> issue starts happening.
>>
>> Therefore if in my code you were to save the pandas dataframe as a CSV
>> file and then read it then you will find the following observations:
>>
>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
>> 

Re: SPARK Issue in Standalone cluster

2017-08-06 Thread Gourav Sengupta
Hi Marco,

thanks a ton, I will surely use those alternatives.


Regards,
Gourav Sengupta

On Sun, Aug 6, 2017 at 3:45 PM, Marco Mistroni  wrote:

> Sengupta
>  further to this, if you try the following notebook in databricks cloud,
> it will read a .csv file , write to a parquet file and read it again (just
> to count the number of rows stored)
> Please note that the path to the csv file might differ for you.
> So, what you will need todo is
> 1 - create an account to community.cloud.databricks.com
> 2 - upload the .csv file onto the Data of your databricks private cluster
> 3  - run the script. that will store the data on the distrubuted
> filesystem of the databricks cloudn (dbfs)
>
> It's worth investing in this free databricks cloud as it can create a
> cluster for you with minimal effort, and it's  a very easy way to test your
> spark scripts on a real cluster
>
> hope this helps
> kr
>
> ##
> from pyspark.sql import SQLContext
>
> from random import randint
> from time import sleep
> from pyspark.sql.session import SparkSession
> import logging
> logger = logging.getLogger(__name__)
> logger.setLevel(logging.INFO)
> ch = logging.StreamHandler()
> logger.addHandler(ch)
>
>
> import sys
>
> def read_parquet_file(parquetFileName):
>   logger.info('Reading now the parquet files we just created...:%s',
> parquetFileName)
>   parquet_data = sqlContext.read.parquet(parquetFileName)
>   logger.info('Parquet file has %s', parquet_data.count())
>
> def dataprocessing(filePath, count, sqlContext):
> logger.info( 'Iter count is:%s' , count)
> if count == 0:
> print 'exiting'
> else:
> df_traffic_tmp = sqlContext.read.format("csv").
> option("header",'true').load(filePath)
> logger.info( '#DataSet has:%s' ,
> df_traffic_tmp.count())
> logger.info('WRting to a parquet file')
> parquetFileName = "dbfs:/myParquetDf2.parquet"
> df_traffic_tmp.write.parquet(parquetFileName)
> sleepInterval = randint(10,100)
> logger.info( '#Sleeping for %s' ,
> sleepInterval)
> sleep(sleepInterval)
> read_parquet_file(parquetFileName)
> dataprocessing(filePath, count-1, sqlContext)
>
> filename = '/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This
> path might differ for you
> iterations = 1
> logger.info('--')
> logger.info('Filename:%s', filename)
> logger.info('Iterations:%s', iterations )
> logger.info('--')
>
> logger.info ('Initializing sqlContext')
> logger.info( 'Starting spark..Loading from%s for %s
> iterations' , filename, iterations)
> logger.info(  'Starting up')
> sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
> logger.info ('Initializing sqlContext')
> sqlContext = SQLContext(sc)
> dataprocessing(filename, iterations, sqlContext)
> logger.info('Out of here..')
> ##
>
>
> On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni 
> wrote:
>
>> Uh believe me there are lots of ppl on this list who will send u code
>> snippets if u ask... 
>>
>> Yes that is what Steve pointed out, suggesting also that for that simple
>> exercise you should perform all operations on a spark standalone instead
>> (or alt. Use an nfs on the cluster)
>> I'd agree with his suggestion
>> I suggest u another alternative:
>> https://community.cloud.databricks.com/
>>
>> That's a ready made cluster and you can run your spark app as well store
>> data on the cluster (well I haven't tried myself but I assume it's
>> possible).   Try that out... I will try ur script there as I have an
>> account there (though I guess I'll get there before me.)
>>
>> Try that out and let me know if u get stuck
>> Kr
>>
>> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" 
>> wrote:
>>
>>> Hi Marco,
>>>
>>> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
>>> someone actually executing code and providing response. It feel wonderful
>>> that at least someone considered to respond back by executing code and just
>>> did not filter out each and every technical details to brood only on my
>>> superb social skills, while claiming the reason for ignoring technical
>>> details is that it elementary. I think that Steve also is the first person
>>> who could answer the WHY of an elementary question instead of saying that
>>> is how it is and pointed out to the correct documentation.
>>>
>>> That code works fantastically. But the problem which I have tried to
>>> find out is while writing out the data and not reading it.
>>>
>>>
>>> So if you see try to read the data from the same folder which has the
>>> same file across all the nodes then it will work fine. In fact that is what
>>> should work.
>>>
>>> What does not work is that if you try to write back the file and then
>>> 

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread Cody Koeninger
I mean that the kafka consumers running on the executors should not be
automatically committing, because the fact that a message was read by
the consumer has no bearing on whether it was actually successfully
processed after reading.

It sounds to me like you're confused about where code is running.
foreachRDD runs on the driver, not the executor.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
 wrote:
> Thanks Cody for your response.
>
> All I want to do is, commit the offsets only if I am successfully able to
> write to cassandra database.
>
> The line //save the rdd to Cassandra database is
> rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>
> What do you mean by Executors shouldn't be auto-committing, that's why it's
> being overridden. It is the executors that do the mapping and saving to
> cassandra. The status of success or failure of this operation is known only
> on the executor and thats where I want to commit the kafka offsets. If this
> is not what I sould be doing, then  what is the right way?
>
> On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger  wrote:
>>
>> If your complaint is about offsets being committed that you didn't
>> expect... auto commit being false on executors shouldn't have anything
>> to do with that.  Executors shouldn't be auto-committing, that's why
>> it's being overridden.
>>
>> What you've said and the code you posted isn't really enough to
>> explain what your issue is, e.g.
>>
>> is this line
>> // save the rdd to Cassandra database
>> a blocking call
>>
>> are you sure that the rdd foreach isn't being retried and succeeding
>> the second time around, etc
>>
>> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>>  wrote:
>> > Hello All,
>> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >
>> > I am setting enable.auto.commit to false, and manually want to commit
>> > the
>> > offsets after my output operation is successful. So when a exception is
>> > raised during during the processing I do not want the offsets to be
>> > committed. But looks like the offsets are automatically committed even
>> > when
>> > the exception is raised and thereby I am losing data.
>> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>> > executor.  But I don't want it to override. Please help.
>> >
>> > My code looks like..
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> brokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "group.id" -> "Group1",
>> >   "auto.offset.reset" -> offsetresetparameter,
>> >   "enable.auto.commit" -> (false: java.lang.Boolean)
>> > )
>> >
>> > val myTopics = Array("topic1")
>> > val stream1 = KafkaUtils.createDirectStream[String, String](
>> >   ssc,
>> >   PreferConsistent,
>> >   Subscribe[String, String](myTopics, kafkaParams)
>> > )
>> >
>> > stream1.foreachRDD { (rdd, time) =>
>> > val offsetRanges =
>> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> > try {
>> > //save the rdd to Cassandra database
>> >
>> >
>> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> > } catch {
>> >   case ex: Exception => {
>> > println(ex.toString + "!! Bad Data, Unable to persist
>> > into
>> > table !" + errorOffsetRangesToString(offsetRanges))
>> >   }
>> > }
>> > }
>> >
>> > ssc.start()
>> > ssc.awaitTermination()
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2017-08-06 Thread 郭鹏飞
Unsubscribe


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How can i split dataset to multi dataset

2017-08-06 Thread Jone Zhang
val schema = StructType(
  Seq(
  StructField("app", StringType, nullable = true),
  StructField("server", StringType, nullable = true),
  StructField("file", StringType, nullable = true),
  StructField("...", StringType, nullable = true)
  )
)
val row = ...
val dataset = session.createDataFrame(row, schema)

How can i split dataset to dataset array by composite key(app, server,file)
as follow
mapdataset>


Thanks.


Re: How can i split dataset to multi dataset

2017-08-06 Thread Deepak Sharma
This can be mapped as below:
dataset.map(x=>((x(0),x(1),x(2)),x)

This works with Dataframe of rows but i haven't tried with dataset
Thanks
Deepak

On Mon, Aug 7, 2017 at 8:21 AM, Jone Zhang  wrote:

> val schema = StructType(
>   Seq(
>   StructField("app", StringType, nullable = true),
>   StructField("server", StringType, nullable = true),
>   StructField("file", StringType, nullable = true),
>   StructField("...", StringType, nullable = true)
>   )
> )
> val row = ...
> val dataset = session.createDataFrame(row, schema)
>
> How can i split dataset to dataset array by composite key(app,
> server,file) as follow
> mapdataset>
>
>
> Thanks.
>
>
>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
running on executor

stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
is running on driver.

Is this the reason why kafka offsets are committed even when an
exception is raised? If so is there a way to commit the offsets only
when there are no exceptions?



On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande 
wrote:

> Thanks again Cody,
>
> My understanding is all the code inside foreachRDD is running on the
> driver except for
> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>
> When the exception is raised, I was thinking I won't be committing the
> offsets, but the offsets are committed all the time independent of whether
> an exception was raised or not.
>
> It will be helpful if you can explain this behavior.
>
>
> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger  wrote:
>
>> I mean that the kafka consumers running on the executors should not be
>> automatically committing, because the fact that a message was read by
>> the consumer has no bearing on whether it was actually successfully
>> processed after reading.
>>
>> It sounds to me like you're confused about where code is running.
>> foreachRDD runs on the driver, not the executor.
>>
>> http://spark.apache.org/docs/latest/streaming-programming-gu
>> ide.html#design-patterns-for-using-foreachrdd
>>
>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>>  wrote:
>> > Thanks Cody for your response.
>> >
>> > All I want to do is, commit the offsets only if I am successfully able
>> to
>> > write to cassandra database.
>> >
>> > The line //save the rdd to Cassandra database is
>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>> >
>> > What do you mean by Executors shouldn't be auto-committing, that's why
>> it's
>> > being overridden. It is the executors that do the mapping and saving to
>> > cassandra. The status of success or failure of this operation is known
>> only
>> > on the executor and thats where I want to commit the kafka offsets. If
>> this
>> > is not what I sould be doing, then  what is the right way?
>> >
>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
>> wrote:
>> >>
>> >> If your complaint is about offsets being committed that you didn't
>> >> expect... auto commit being false on executors shouldn't have anything
>> >> to do with that.  Executors shouldn't be auto-committing, that's why
>> >> it's being overridden.
>> >>
>> >> What you've said and the code you posted isn't really enough to
>> >> explain what your issue is, e.g.
>> >>
>> >> is this line
>> >> // save the rdd to Cassandra database
>> >> a blocking call
>> >>
>> >> are you sure that the rdd foreach isn't being retried and succeeding
>> >> the second time around, etc
>> >>
>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>> >>  wrote:
>> >> > Hello All,
>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >> >
>> >> > I am setting enable.auto.commit to false, and manually want to commit
>> >> > the
>> >> > offsets after my output operation is successful. So when a exception
>> is
>> >> > raised during during the processing I do not want the offsets to be
>> >> > committed. But looks like the offsets are automatically committed
>> even
>> >> > when
>> >> > the exception is raised and thereby I am losing data.
>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>> >> > executor.  But I don't want it to override. Please help.
>> >> >
>> >> > My code looks like..
>> >> >
>> >> > val kafkaParams = Map[String, Object](
>> >> >   "bootstrap.servers" -> brokers,
>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>> >> >   "group.id" -> "Group1",
>> >> >   "auto.offset.reset" -> offsetresetparameter,
>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean)
>> >> > )
>> >> >
>> >> > val myTopics = Array("topic1")
>> >> > val stream1 = KafkaUtils.createDirectStream[String, String](
>> >> >   ssc,
>> >> >   PreferConsistent,
>> >> >   Subscribe[String, String](myTopics, kafkaParams)
>> >> > )
>> >> >
>> >> > stream1.foreachRDD { (rdd, time) =>
>> >> > val offsetRanges =
>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >> > try {
>> >> > //save the rdd to Cassandra database
>> >> >
>> >> >
>> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> >> > } catch {
>> >> >   case ex: Exception => {
>> >> > println(ex.toString + "!! Bad Data, Unable to persist
>> >> > into
>> >> > table !" + errorOffsetRangesToString(offsetRanges))
>> >> >   }
>> >> > }
>> >> > }
>> >> >
>> >> > ssc.start()
>> >> > ssc.awaitTermination()
>> >
>> >
>>
>
>


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
Thanks again Cody,

My understanding is all the code inside foreachRDD is running on the driver
except for
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").

When the exception is raised, I was thinking I won't be committing the
offsets, but the offsets are committed all the time independent of whether
an exception was raised or not.

It will be helpful if you can explain this behavior.


On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger  wrote:

> I mean that the kafka consumers running on the executors should not be
> automatically committing, because the fact that a message was read by
> the consumer has no bearing on whether it was actually successfully
> processed after reading.
>
> It sounds to me like you're confused about where code is running.
> foreachRDD runs on the driver, not the executor.
>
> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
>
> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>  wrote:
> > Thanks Cody for your response.
> >
> > All I want to do is, commit the offsets only if I am successfully able to
> > write to cassandra database.
> >
> > The line //save the rdd to Cassandra database is
> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >
> > What do you mean by Executors shouldn't be auto-committing, that's why
> it's
> > being overridden. It is the executors that do the mapping and saving to
> > cassandra. The status of success or failure of this operation is known
> only
> > on the executor and thats where I want to commit the kafka offsets. If
> this
> > is not what I sould be doing, then  what is the right way?
> >
> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
> wrote:
> >>
> >> If your complaint is about offsets being committed that you didn't
> >> expect... auto commit being false on executors shouldn't have anything
> >> to do with that.  Executors shouldn't be auto-committing, that's why
> >> it's being overridden.
> >>
> >> What you've said and the code you posted isn't really enough to
> >> explain what your issue is, e.g.
> >>
> >> is this line
> >> // save the rdd to Cassandra database
> >> a blocking call
> >>
> >> are you sure that the rdd foreach isn't being retried and succeeding
> >> the second time around, etc
> >>
> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> >>  wrote:
> >> > Hello All,
> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >> >
> >> > I am setting enable.auto.commit to false, and manually want to commit
> >> > the
> >> > offsets after my output operation is successful. So when a exception
> is
> >> > raised during during the processing I do not want the offsets to be
> >> > committed. But looks like the offsets are automatically committed even
> >> > when
> >> > the exception is raised and thereby I am losing data.
> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
> >> > executor.  But I don't want it to override. Please help.
> >> >
> >> > My code looks like..
> >> >
> >> > val kafkaParams = Map[String, Object](
> >> >   "bootstrap.servers" -> brokers,
> >> >   "key.deserializer" -> classOf[StringDeserializer],
> >> >   "value.deserializer" -> classOf[StringDeserializer],
> >> >   "group.id" -> "Group1",
> >> >   "auto.offset.reset" -> offsetresetparameter,
> >> >   "enable.auto.commit" -> (false: java.lang.Boolean)
> >> > )
> >> >
> >> > val myTopics = Array("topic1")
> >> > val stream1 = KafkaUtils.createDirectStream[String, String](
> >> >   ssc,
> >> >   PreferConsistent,
> >> >   Subscribe[String, String](myTopics, kafkaParams)
> >> > )
> >> >
> >> > stream1.foreachRDD { (rdd, time) =>
> >> > val offsetRanges =
> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >> > try {
> >> > //save the rdd to Cassandra database
> >> >
> >> >
> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> >> > } catch {
> >> >   case ex: Exception => {
> >> > println(ex.toString + "!! Bad Data, Unable to persist
> >> > into
> >> > table !" + errorOffsetRangesToString(offsetRanges))
> >> >   }
> >> > }
> >> > }
> >> >
> >> > ssc.start()
> >> > ssc.awaitTermination()
> >
> >
>


spark-shell not getting launched - Queue's AM resource limit exceeded.

2017-08-06 Thread karan alang
Hello - i've HDP 2.5.x and i'm trying to launch spark-shell ..
ApplicationMaster gets launched, but YARN is not able to assign containers.

*Command ->*

./bin/spark-shell --master yarn-client --driver-memory 512m
--executor-memory 512m

*Error ->*

[Sun Aug 06 19:33:29 + 2017] Application is added to the scheduler and
is not yet activated. Queue's AM resource limit exceeded. Details : AM
Partition = ; AM Resource Request = ; Queue Resource Limit for AM = ; User AM
Resource Limit of the queue = ; Queue AM Resource
Usage = ;

Any ideas on what parameters to change ?

Pls note -> In YARN, the parameter -
*yarn.scheduler.capacity.maximum-am-resource-percent
= 0.9* , AM should have access sufficient to assign container