RE: mapPartitioningWithIndex in Dataframe

2017-08-05 Thread Mendelson, Assaf
First I believe you mean on the Dataset API rather than the dataframe API.
You can easily add the partition index as a new column to your dataframe using 
spark_partition_id()
Then a normal mapPartitions should work fine (i.e. you should create the 
appropriate case class which includes the partition id and then do 
mapPartitions).

Thanks,
  Assaf.

From: Lalwani, Jayesh [mailto:jayesh.lalw...@capitalone.com]
Sent: Thursday, August 03, 2017 5:20 PM
To: user@spark.apache.org
Subject: mapPartitioningWithIndex in Dataframe

Are there any plans to add mapPartitioningWithIndex in the Dataframe API? Or is 
there any way to implement my own mapPartitionWithIndex for a Dataframe?

I am implementing something which is logically similar to the randomSplit 
function. In 2.1, randomSplit internally does df.mapPartitionWithIndex and 
assigns a different seed for every partition by adding the partition’s index to 
the seed. I want to get  a partition specific seed too.

The problem is rdd.mapPartitionWithIndex doesn’t work in streaming. 
df.mapPartition works, but I don’t get index.

Is there a way to extend Spark to add mapPartitionWithIndex at the Dataframe 
level ?
I was digging into the 2.2 code a bit and it looks like in 2.2, all the 
Dataframe apis have been changed to be based around SparkStrategy. I couldn’t 
figure out  how I can add my own custom strategy. Is there any documentation 
around this? If it makes sense to add this to Spark, I would be excited to make 
a contribution.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-05 Thread shyla deshpande
Hello All,
I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .

I am setting enable.auto.commit to false, and manually want to commit
the offsets after my output operation is successful. So when a
exception is raised during during the processing I do not want the
offsets to be committed. But looks like the offsets are automatically
committed even when the exception is raised and thereby I am losing
data.
In my logs I see,  WARN  overriding enable.auto.commit to false for
executor.  But I don't want it to override. Please help.

My code looks like..

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> brokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "Group1",
  "auto.offset.reset" -> offsetresetparameter,
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val myTopics = Array("topic1")
val stream1 = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](myTopics, kafkaParams)
)

stream1.foreachRDD { (rdd, time) =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
try {
//save the rdd to Cassandra database

  stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} catch {
  case ex: Exception => {
println(ex.toString + "!! Bad Data, Unable to persist
into table !" + errorOffsetRangesToString(offsetRanges))
  }
}
}

ssc.start()
ssc.awaitTermination()


Trying to connect Spark 1.6 to Hive

2017-08-05 Thread toletum
Hi everybody
I'm trying to connect Spark to Hive. 
Hive uses Derby Server for metastore_db. 
$SPARK_HOME/conf/hive-site.xml
  javax.jdo.option.ConnectionURL
  jdbc:derby://derby:1527/metastore_db;create=true
  JDBC connect string for a JDBC metastore
  javax.jdo.option.ConnectionDriverName
  org.apache.derby.jdbc.ClientDriver
  Driver class name for a JDBC metastore
I have copied to $SPARK_HOME/lib derby.jar, derbyclient.jar, derbytools.jar
Added to CLASSPATH the 3 jars too
$SPARK_HOMElib/derby.jar:$SPARK_HOME/lib/derbytools.jar:$SPARK_HOME/lib/derbyclient.jar
But spark-sql saids:
org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: 
The specified datastore driver ("org.apache.derby.jdbc.ClientDriver") was not 
found in the CLASSPATH. Please check your CLASSPATH specification, and the name 
of the driver.
java finds the class
java org.apache.derby.jdbc.ClientDriver
Error: Main method not found in class org.apache.derby.jdbc.ClientDriver, 
please define the main method as:
   public static void main(String[] args)
or a JavaFX application class must extend javafx.application.Application
It seems Spark can't find the driver


Re: SPARK Issue in Standalone cluster

2017-08-05 Thread Marco Mistroni
Uh believe me there are lots of ppl on this list who will send u code
snippets if u ask... 

Yes that is what Steve pointed out, suggesting also that for that simple
exercise you should perform all operations on a spark standalone instead
(or alt. Use an nfs on the cluster)
I'd agree with his suggestion
I suggest u another alternative:
https://community.cloud.databricks.com/

That's a ready made cluster and you can run your spark app as well store
data on the cluster (well I haven't tried myself but I assume it's
possible).   Try that out... I will try ur script there as I have an
account there (though I guess I'll get there before me.)

Try that out and let me know if u get stuck
Kr

On Aug 5, 2017 8:40 PM, "Gourav Sengupta"  wrote:

> Hi Marco,
>
> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
> someone actually executing code and providing response. It feel wonderful
> that at least someone considered to respond back by executing code and just
> did not filter out each and every technical details to brood only on my
> superb social skills, while claiming the reason for ignoring technical
> details is that it elementary. I think that Steve also is the first person
> who could answer the WHY of an elementary question instead of saying that
> is how it is and pointed out to the correct documentation.
>
> That code works fantastically. But the problem which I have tried to find
> out is while writing out the data and not reading it.
>
>
> So if you see try to read the data from the same folder which has the same
> file across all the nodes then it will work fine. In fact that is what
> should work.
>
> What does not work is that if you try to write back the file and then read
> it once again from the location you have written that is when the issue
> starts happening.
>
> Therefore if in my code you were to save the pandas dataframe as a CSV
> file and then read it then you will find the following observations:
>
> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
> 
> 
> 
> ---
> pandasdf = pandas.DataFrame(numpy.random.randn(1, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/
> sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> 
> 
> 
> ---
>
>
> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
> THE DATA DOES NOT EXISTS
> 
> 
> 
> ---
> pandasdf = pandas.DataFrame(numpy.random.randn(1, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("file:///Users/gouravsengupta/
> Development/spark/sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> 
> 
> 
> ---
>
>
> if you execute my code then also you will surprisingly see that the writes
> in the nodes which is not the master node does not complete moving the
> files from the _temporary folder to the main one.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni 
> wrote:
>
>> Hello
>>  please have a look at this. it'sa simple script that just read a
>> dataframe for n time, sleeping at random interval. i used it to test memory
>> issues that another user was experiencing on a spark cluster
>>
>> you should run it like this e.g
>> spark-submit dataprocessing_Sample.-2py  > of iterations>
>>
>> i ran it on the cluster like this
>>
>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>> west-2.compute.amazonaws.com:7077   
>> /root/pyscripts/dataprocessing_Sample-2.py
>> file:///root/pyscripts/tree_addhealth.csv
>>
>> hth, ping me back if you have issues
>> i do agree with Steve's comments if you want to test your  spark
>> script s just for playing, do it on  a standaone server on your localhost.
>> Moving to a c luster is just a matter of deploying your script and mke sure
>> you have a common place where to read and store the data. SysAdmin
>> should give you this when they setup the 

Re: SPARK Issue in Standalone cluster

2017-08-05 Thread Gourav Sengupta
Hi Marco,

For the first time in several years FOR THE VERY FIRST TIME. I am seeing
someone actually executing code and providing response. It feel wonderful
that at least someone considered to respond back by executing code and just
did not filter out each and every technical details to brood only on my
superb social skills, while claiming the reason for ignoring technical
details is that it elementary. I think that Steve also is the first person
who could answer the WHY of an elementary question instead of saying that
is how it is and pointed out to the correct documentation.

That code works fantastically. But the problem which I have tried to find
out is while writing out the data and not reading it.


So if you see try to read the data from the same folder which has the same
file across all the nodes then it will work fine. In fact that is what
should work.

What does not work is that if you try to write back the file and then read
it once again from the location you have written that is when the issue
starts happening.

Therefore if in my code you were to save the pandas dataframe as a CSV file
and then read it then you will find the following observations:

FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
---
pandasdf = pandas.DataFrame(numpy.random.randn(1, 4),
columns=list('ABCD'))
pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
header=True, sep=",", index=0)
testdf = spark.read.load("/Users/gouravsengupta/Development/
spark/sparkdata/testdir/")
testdf.cache()
testdf.count()
--
-


FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
THE DATA DOES NOT EXISTS
---
pandasdf = pandas.DataFrame(numpy.random.randn(1, 4),
columns=list('ABCD'))
pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
header=True, sep=",", index=0)
testdf = spark.read.load("file:///Users/gouravsengupta/Development/
spark/sparkdata/testdir/")
testdf.cache()
testdf.count()
--
-


if you execute my code then also you will surprisingly see that the writes
in the nodes which is not the master node does not complete moving the
files from the _temporary folder to the main one.


Regards,
Gourav Sengupta



On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni  wrote:

> Hello
>  please have a look at this. it'sa simple script that just read a
> dataframe for n time, sleeping at random interval. i used it to test memory
> issues that another user was experiencing on a spark cluster
>
> you should run it like this e.g
> spark-submit dataprocessing_Sample.-2py   of iterations>
>
> i ran it on the cluster like this
>
> ./spark-submit --master spark://ec2-54-218-113-119.us-
> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
> file:///root/pyscripts/tree_addhealth.csv
>
> hth, ping me back if you have issues
> i do agree with Steve's comments if you want to test your  spark
> script s just for playing, do it on  a standaone server on your localhost.
> Moving to a c luster is just a matter of deploying your script and mke sure
> you have a common place where to read and store the data. SysAdmin
> should give you this when they setup the cluster...
>
> kr
>
>
>
>
> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta  > wrote:
>
>> Hi Marco,
>>
>> I am sincerely obliged for your kind time and response. Can you please
>> try the solution that you have so kindly suggested?
>>
>> It will be a lot of help if you could kindly execute the code that I have
>> given. I dont think that anyone has yet.
>>
>> There are lots of fine responses to my question here, but if you read the
>> last response from Simon, it comes the closest to being satisfactory. I am
>> sure even he did not execute the code, but at least he came quite close to
>> understanding what the problem is.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni 
>> wrote:
>>
>>> Hello
>>>  my 2 cents here, hope it helps
>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>> it's an unnecessary dependency that you dont need 

PySpark Streaming keeps dying

2017-08-05 Thread Riccardo Ferrari
Hi list,

I have Sark 2.2.0 in standalone mode and python 3.6. It is a very small
testing cluster with two nodes.
I am running (trying) a streaming job that simple read from kafka, apply an
ML model and store it back into kafka.
The job is run with following parameters:
"--conf spark.cores.max=2 --conf spark.executor.cores=2 --conf
spark.executor.memory=2g"

The problem I'm facing is that very often the job crash with this exception:

117/08/05 00:19:00 ERROR Utils: Uncaught exception in thread stdout writer
for /opt/spark/miniconda2/envs/pyspark36/bin/python
java.lang.AssertionError: assertion failed: Block rdd_474_0 is not locked
for reading
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
at
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
at
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/08/05 00:19:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[stdout writer for
/opt/spark/miniconda2/envs/pyspark36/bin/python,5,main]
java.lang.AssertionError: assertion failed: Block rdd_474_0 is not locked
for reading
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
at
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
at
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

The stream is created via:
directKafkaStream = KafkaUtils.createDirectStream(ssc,...

The processing:
directKafkaStream.cache().foreachRDD(self._process)

where self._process:

   - puts the RDD into a Dataframe
   - apply a model.transform
   - store it back

Has anyone experienced this?
Any suggestion on how to attak the problem?
I am not sure it is resource constraint as I tried rise cores and memory
with no luck.

Any hint much appreciated,