Unsubscribe

2016-08-15 Thread 何琪


Re: Spark Yarn executor container memory

2016-08-15 Thread Jörn Franke
Both are part of the heap.

> On 16 Aug 2016, at 04:26, Lan Jiang  wrote:
> 
> Hello,
> 
> My understanding is that YARN executor container memory is based on 
> "spark.executor.memory" + “spark.yarn.executor.memoryOverhead”. The first one 
> is for heap memory and second one is for offheap memory. The 
> spark.executor.memory is used by -Xmx to set the max heap size. Now my 
> question is why it does not count permgen size and memory used by stack. They 
> are not part of the max heap size. IMHO, YARN executor container memory 
> should be set to:  spark.executor.memory  + [-XX:MaxPermSize] + 
> number_of_threads * [-Xss] + spark.yarn.executor.memoryOverhead. What did I 
> miss?
> 
> Lan
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2016-08-15 Thread Sarath Chandra



Re: [ANNOUNCE] Apache Bahir 2.0.0

2016-08-15 Thread Mridul Muralidharan
Congratulations, great job everyone !

Regards,
Mridul

On Mon, Aug 15, 2016 at 2:19 PM, Luciano Resende  wrote:
> The Apache Bahir PMC is pleased to announce the release of Apache Bahir
> 2.0.0  which is our first major release and provides the following
> extensions for Apache Spark 2.0.0 :
>
> Akka Streaming
> MQTT Streaming and Structured Streaming
> Twitter Streaming
> ZeroMQ Streaming
>
> For more information about Apache Bahir and to download the release:
>
> http://bahir.apache.org
>
> Thanks,
>
> The Apache Bahir PMC

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Apache Spark toDebugString producing different output for python and scala repl

2016-08-15 Thread Saisai Shao
The implementation inside the Python API and Scala API for RDD is slightly
different, so the difference of RDD lineage you printed is expected.

On Tue, Aug 16, 2016 at 10:58 AM, DEEPAK SHARMA  wrote:

> Hi All,
>
>
> Below is the small piece of code in scala and python REPL in Apache
> Spark.However I am getting different output in both the language when I
> execute toDebugString.I am using cloudera quick start VM.
>
> PYTHON
>
> rdd2 = sc.textFile('file:/home/training/training_materials/
> data/frostroad.txt').map(lambda x:x.upper()).filter(lambda x : 'THE' in x)
>
> print rdd2.toDebugString()(1) PythonRDD[56] at RDD at PythonRDD.scala:42 []
>  |  file:/home/training/training_materials/data/frostroad.txt 
> MapPartitionsRDD[55] at textFile at NativeMethodAccessorImpl.java:-2 []
>  |  file:/home/training/training_materials/data/frostroad.txt HadoopRDD[54] 
> at textFile at ..
>
> SCALA
>
>  val rdd2 = 
> sc.textFile("file:/home/training/training_materials/data/frostroad.txt").map(x
>  => x.toUpperCase()).filter(x => x.contains("THE"))
>
>
>
> rdd2.toDebugString
> res1: String = (1) MapPartitionsRDD[3] at filter at :21 []
>  |  MapPartitionsRDD[2] at map at :21 []
>  |  file:/home/training/training_materials/data/frostroad.txt 
> MapPartitionsRDD[1] at textFile at :21 []
>  |  file:/home/training/training_materials/data/frostroad.txt HadoopRDD[0] at 
> textFile at <
>
>
> Also one of cloudera slides say that the default partitions  is 2 however
> its 1 (looking at output of toDebugString).
>
>
> Appreciate any help.
>
>
> Thanks
>
> Deepak Sharma
>


Re: Apache Spark toDebugString producing different output for python and scala repl

2016-08-15 Thread DEEPAK SHARMA
Hi All,


Below is the small piece of code in scala and python REPL in Apache 
Spark.However I am getting different output in both the language when I execute 
toDebugString.I am using cloudera quick start VM.

PYTHON

rdd2 = 
sc.textFile('file:/home/training/training_materials/data/frostroad.txt').map(lambda
 x:x.upper()).filter(lambda x : 'THE' in x)

print rdd2.toDebugString()
(1) PythonRDD[56] at RDD at PythonRDD.scala:42 []
 |  file:/home/training/training_materials/data/frostroad.txt 
MapPartitionsRDD[55] at textFile at NativeMethodAccessorImpl.java:-2 []
 |  file:/home/training/training_materials/data/frostroad.txt HadoopRDD[54] at 
textFile at ..

SCALA

 val rdd2 = 
sc.textFile("file:/home/training/training_materials/data/frostroad.txt").map(x 
=> x.toUpperCase()).filter(x => x.contains("THE"))



rdd2.toDebugString
res1: String =
(1) MapPartitionsRDD[3] at filter at :21 []
 |  MapPartitionsRDD[2] at map at :21 []
 |  file:/home/training/training_materials/data/frostroad.txt 
MapPartitionsRDD[1] at textFile at :21 []
 |  file:/home/training/training_materials/data/frostroad.txt HadoopRDD[0] at 
textFile at <


Also one of cloudera slides say that the default partitions  is 2 however its 1 
(looking at output of toDebugString).


Appreciate any help.


Thanks

Deepak Sharma


Spark Yarn executor container memory

2016-08-15 Thread Lan Jiang
Hello,

My understanding is that YARN executor container memory is based on 
"spark.executor.memory" + “spark.yarn.executor.memoryOverhead”. The first one 
is for heap memory and second one is for offheap memory. The 
spark.executor.memory is used by -Xmx to set the max heap size. Now my question 
is why it does not count permgen size and memory used by stack. They are not 
part of the max heap size. IMHO, YARN executor container memory should be set 
to:  spark.executor.memory  + [-XX:MaxPermSize] + number_of_threads * [-Xss] + 
spark.yarn.executor.memoryOverhead. What did I miss?

Lan
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: read kafka offset from spark checkpoint

2016-08-15 Thread Cody Koeninger
No, you really shouldn't rely on checkpoints if you cant afford to
reprocess from the beginning of your retention (or lose data and start
from the latest messages).

If you're in a real bind, you might be able to get something out of
the serialized data in the checkpoint, but it'd probably be easier and
faster to just grep through the output logs for the last processed
offsets (assuming you're logging at info level).  Look for "Computing
topic "

On Mon, Aug 15, 2016 at 4:14 PM, Shifeng Xiao  wrote:
> Hi folks,
>
> We  are using kafka + spark streaming in our data pipeline,  but sometimes
> we have to clean up checkpoint from hdfs before we restart spark streaming
> application, otherwise the application fails to start.
>
> That means we are losing data when we clean up checkpoint, is there a way to
> read kafka offset from checkpoint so that we might be able tp process the
> data from that offset to avoid losing data.
>
> Thanks

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Sum array values by row in new column

2016-08-15 Thread Mike Metzger
Assuming you know the number of elements in the list, this should work:

df.withColumn('total', df["_1"].getItem(0) + df["_1"].getItem(1) +
df["_1"].getItem(2))

Mike

On Mon, Aug 15, 2016 at 12:02 PM, Javier Rey  wrote:

> Hi everyone,
>
> I have one dataframe with one column this column is an array of numbers,
> how can I sum each array by row a obtain a new column with sum? in pyspark.
>
> Example:
>
> ++
> | numbers|
> ++
> |[10, 20, 30]|
> |[40, 50, 60]|
> |[70, 80, 90]|
> ++
>
> The idea is obtain the same df with a new column with totals:
>
> ++--
> | numbers| |
> ++--
> |[10, 20, 30]|60   |
> |[40, 50, 60]|150  |
> |[70, 80, 90]|240  |
> ++--
>
> Regards!
>
> Samir
>
>
>
>


Re: [ANNOUNCE] Apache Bahir 2.0.0

2016-08-15 Thread Mridul Muralidharan
Congratulations, great job everyone !

Regards
Mridul

On Monday, August 15, 2016, Luciano Resende  wrote:

> The Apache Bahir PMC is pleased to announce the release of Apache Bahir
> 2.0.0  which is our first major release and provides the following
> extensions for Apache Spark 2.0.0 :
>
> Akka Streaming
> MQTT Streaming and Structured Streaming
> Twitter Streaming
> ZeroMQ Streaming
>
> For more information about Apache Bahir and to download the release:
>
> http://bahir.apache.org
>
> Thanks,
>
> The Apache Bahir PMC
>


Re: [ANNOUNCE] Apache Bahir 2.0.0

2016-08-15 Thread Chris Mattmann
Great work Luciano!



On 8/15/16, 2:19 PM, "Luciano Resende"  wrote:

The Apache Bahir PMC is pleased to announce the release of Apache Bahir
2.0.0  which is our first major release and provides the following
extensions for Apache Spark 2.0.0 :

Akka Streaming
MQTT Streaming and Structured Streaming
Twitter Streaming
ZeroMQ Streaming

For more information about Apache Bahir and to download the release:

http://bahir.apache.org

Thanks,

The Apache Bahir PMC




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SizeEstimator for python

2016-08-15 Thread Maurin Lenglart
Hi,
Is there a way to estimate the size of a dataframe in python?
Something similar to 
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/util/SizeEstimator.html
 ?

thanks


[ANNOUNCE] Apache Bahir 2.0.0

2016-08-15 Thread Luciano Resende
The Apache Bahir PMC is pleased to announce the release of Apache Bahir
2.0.0  which is our first major release and provides the following
extensions for Apache Spark 2.0.0 :

Akka Streaming
MQTT Streaming and Structured Streaming
Twitter Streaming
ZeroMQ Streaming

For more information about Apache Bahir and to download the release:

http://bahir.apache.org

Thanks,

The Apache Bahir PMC


read kafka offset from spark checkpoint

2016-08-15 Thread Shifeng Xiao
Hi folks,

We  are using kafka + spark streaming in our data pipeline,  but sometimes
we have to clean up checkpoint from hdfs before we restart spark streaming
application, otherwise the application fails to start.

That means we are losing data when we clean up checkpoint, is there a way
to read kafka offset from checkpoint so that we might be able tp process
the data from that offset to avoid losing data.

Thanks


Spark 2.0.0 OOM error at beginning of RDD map on AWS

2016-08-15 Thread Arun Luthra
I got this OOM error in Spark local mode. The error seems to have been at
the start of a stage (all of the stages on the UI showed as complete, there
were more stages to do but had not showed up on the UI yet).

There appears to be ~100G of free memory at the time of the error.

Spark 2.0.0
200G driver memory
local[30]
8 /mntX/tmp directories for spark.local.dir
"spark.sql.shuffle.partitions", "500"
"spark.driver.maxResultSize","500"
"spark.default.parallelism", "1000"

The line number for the error is at an RDD map operation where there are
some potentially large Map objects that are going to be accessed by each
record. Does it matter if they are broadcast variables or not? I imagine
not because its in local mode they should be available in memory to every
executor/core.

Possibly related:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-td24796.html

Exception in thread "main" java.lang.OutOfMemoryError
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at abc.Abc$.main(abc.scala:395)
at abc.Abc.main(abc.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Re: How to do nested for-each loops across RDDs ?

2016-08-15 Thread Eric Ho
Thanks Daniel.
Do you have any code fragments on using CoGroups or Joins across 2 RDDs ?
I don't think that index would help much because this is an N x M
operation, examining each cell of each RDD.  Each comparison is complex as
it needs to peer into a complex JSON


On Mon, Aug 15, 2016 at 1:24 PM, Daniel Imberman 
wrote:

> There's no real way of doing nested for-loops with RDD's because the whole
> idea is that you could have so much data in the RDD that it would be really
> ugly to store it all in one worker.
>
> There are, however, ways to handle what you're asking about.
>
> I would personally use something like CoGroup or Join between the two
> RDDs. if index matters, you can use ZipWithIndex on both before you join
> and then see which indexes match up.
>
> On Mon, Aug 15, 2016 at 1:15 PM Eric Ho  wrote:
>
>> I've nested foreach loops like this:
>>
>>   for i in A[i] do:
>> for j in B[j] do:
>>   append B[j] to some list if B[j] 'matches' A[i] in some fashion.
>>
>> Each element in A or B is some complex structure like:
>> (
>>   some complex JSON,
>>   some number
>> )
>>
>> Question: if A and B were represented as RRDs (e.g. RRD(A) and RRD(B)),
>> how would my code look ?
>> Are there any RRD operators that would allow me to loop thru both RRDs
>> like the above procedural code ?
>> I can't find any RRD operators nor any code fragments that would allow me
>> to do this.
>>
>> Thing is: by that time I composed RRD(A), this RRD would have contain
>> elements in array B as well as array A.
>> Same argument for RRD(B).
>>
>> Any pointers much appreciated.
>>
>> Thanks.
>>
>>
>> --
>>
>> -eric ho
>>
>>


-- 

-eric ho


Re: How to do nested for-each loops across RDDs ?

2016-08-15 Thread Daniel Imberman
There's no real way of doing nested for-loops with RDD's because the whole
idea is that you could have so much data in the RDD that it would be really
ugly to store it all in one worker.

There are, however, ways to handle what you're asking about.

I would personally use something like CoGroup or Join between the two RDDs.
if index matters, you can use ZipWithIndex on both before you join and then
see which indexes match up.

On Mon, Aug 15, 2016 at 1:15 PM Eric Ho  wrote:

> I've nested foreach loops like this:
>
>   for i in A[i] do:
> for j in B[j] do:
>   append B[j] to some list if B[j] 'matches' A[i] in some fashion.
>
> Each element in A or B is some complex structure like:
> (
>   some complex JSON,
>   some number
> )
>
> Question: if A and B were represented as RRDs (e.g. RRD(A) and RRD(B)),
> how would my code look ?
> Are there any RRD operators that would allow me to loop thru both RRDs
> like the above procedural code ?
> I can't find any RRD operators nor any code fragments that would allow me
> to do this.
>
> Thing is: by that time I composed RRD(A), this RRD would have contain
> elements in array B as well as array A.
> Same argument for RRD(B).
>
> Any pointers much appreciated.
>
> Thanks.
>
>
> --
>
> -eric ho
>
>


How to do nested for-each loops across RDDs ?

2016-08-15 Thread Eric Ho
I've nested foreach loops like this:

  for i in A[i] do:
for j in B[j] do:
  append B[j] to some list if B[j] 'matches' A[i] in some fashion.

Each element in A or B is some complex structure like:
(
  some complex JSON,
  some number
)

Question: if A and B were represented as RRDs (e.g. RRD(A) and RRD(B)), how
would my code look ?
Are there any RRD operators that would allow me to loop thru both RRDs like
the above procedural code ?
I can't find any RRD operators nor any code fragments that would allow me
to do this.

Thing is: by that time I composed RRD(A), this RRD would have contain
elements in array B as well as array A.
Same argument for RRD(B).

Any pointers much appreciated.

Thanks.


-- 

-eric ho


Re: Change nullable property in Dataset schema

2016-08-15 Thread Koert Kuipers
why do you want the array to have nullable = false? what is the benefit?

On Wed, Aug 3, 2016 at 10:45 AM, Kazuaki Ishizaki 
wrote:

> Dear all,
> Would it be possible to let me know how to change nullable property in
> Dataset?
>
> When I looked for how to change nullable property in Dataframe schema, I
> found the following approaches.
> http://stackoverflow.com/questions/33193958/change-
> nullable-property-of-column-in-spark-dataframe
> https://github.com/apache/spark/pull/13873(Not merged yet)
>
> However, I cannot find how to change nullable property in Dataset schema.
> Even when I wrote the following program, nullable property for "value:
> array" in ds2.schema is not changed.
> If my understanding is correct, current Spark 2.0 uses an
> ExpressionEncoder that is generated based on Dataset[T] at
> https://github.com/apache/spark/blob/master/sql/
> catalyst/src/main/scala/org/apache/spark/sql/catalyst/
> encoders/ExpressionEncoder.scala#L46
>
> class Test extends QueryTest with SharedSQLContext {
>   import testImplicits._
>   test("test") {
> val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2),
> Array(3, 3)), 1).toDS
> val schema = new StructType().add("array", ArrayType(IntegerType,
> false), false)
> val inputObject = BoundReference(0, 
> ScalaReflection.dataTypeFor[Array[Int]],
> false)
> val encoder = new ExpressionEncoder[Array[Int]](schema, true,
>   ScalaReflection.serializerFor[Array[Int]](inputObject).flatten,
>   ScalaReflection.deserializerFor[Array[Int]],
>   ClassTag[Array[Int]](classOf[Array[Int]]))
> val ds2 = ds1.map(e => e)(encoder)
> ds1.printSchema
> ds2.printSchema
>   }
> }
>
> root
>  |-- value: array (nullable = true)
>  ||-- element: integer (containsNull = false)
>
> root
>  |-- value: array (nullable = true) // Expected
> (nullable = false)
>  ||-- element: integer (containsNull = false)
>
>
> Kazuaki Ishizaki
>


Re: Number of tasks on executors become negative after executor failures

2016-08-15 Thread Sean Owen
-dev (this is appropriate for user@)

Probably https://issues.apache.org/jira/browse/SPARK-10141 or
https://issues.apache.org/jira/browse/SPARK-11334 but those aren't
resolved. Feel free to jump in.


On Mon, Aug 15, 2016 at 8:13 PM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> *Summary:*
>
> I am running Spark 1.5 on CDH5.5.1.  Under extreme load intermittently I
> am getting this connection failure exception and later negative executor in
> the Spark UI.
>
>
>
> *Exception:*
>
> TRACE: org.apache.hadoop.hbase.ipc.AbstractRpcClient - Call: Multi,
> callTime: 76ms
>
> INFO : org.apache.spark.network.client.TransportClientFactory - Found
> inactive connection to /xxx.xxx.xxx., creating a new one.
>
> ERROR: org.apache.spark.network.shuffle.RetryingBlockFetcher - Exception
> while beginning fetch of 1 outstanding blocks (after 1 retries)
>
> java.io.IOException: Failed to connect to /xxx.xxx.xxx.
>
> at org.apache.spark.network.client.TransportClientFactory.
> createClient(TransportClientFactory.java:193)
>
> at org.apache.spark.network.client.TransportClientFactory.
> createClient(TransportClientFactory.java:156)
>
> at org.apache.spark.network.netty.
> NettyBlockTransferService$$anon$1.createAndStart(
> NettyBlockTransferService.scala:88)
>
> at org.apache.spark.network.shuffle.RetryingBlockFetcher.
> fetchAllOutstanding(RetryingBlockFetcher.java:140)
>
> at org.apache.spark.network.shuffle.RetryingBlockFetcher.
> access$200(RetryingBlockFetcher.java:43)
>
> at org.apache.spark.network.shuffle.RetryingBlockFetcher$
> 1.run(RetryingBlockFetcher.java:170)
>
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:471)
>
> at java.util.concurrent.FutureTask.run(FutureTask.
> java:262)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.net.ConnectException: Connection refused:
> /xxx.xxx.xxx.
>
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native
> Method)
>
> at sun.nio.ch.SocketChannelImpl.finishConnect(
> SocketChannelImpl.java:739)
>
> at io.netty.channel.socket.nio.NioSocketChannel.
> doFinishConnect(NioSocketChannel.java:224)
>
> at io.netty.channel.nio.AbstractNioChannel$
> AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:528)
>
> at io.netty.channel.nio.NioEventLoop.
> processSelectedKeysOptimized(NioEventLoop.java:468)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> NioEventLoop.java:382)
>
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.
> java:354)
>
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
>
> ... 1 more
>
>
>
>
>
> *Related Defects*:
>
> https://issues.apache.org/jira/browse/SPARK-2319
>
> https://issues.apache.org/jira/browse/SPARK-9591
>
>
>
>
>
>


Number of tasks on executors become negative after executor failures

2016-08-15 Thread Rachana Srivastava
Summary:
I am running Spark 1.5 on CDH5.5.1.  Under extreme load intermittently I am 
getting this connection failure exception and later negative executor in the 
Spark UI.

Exception:
TRACE: org.apache.hadoop.hbase.ipc.AbstractRpcClient - Call: Multi, callTime: 
76ms
INFO : org.apache.spark.network.client.TransportClientFactory - Found inactive 
connection to /xxx.xxx.xxx., creating a new one.
ERROR: org.apache.spark.network.shuffle.RetryingBlockFetcher - Exception while 
beginning fetch of 1 outstanding blocks (after 1 retries)
java.io.IOException: Failed to connect to /xxx.xxx.xxx.
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /xxx.xxx.xxx.
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more


Related Defects:
https://issues.apache.org/jira/browse/SPARK-2319
https://issues.apache.org/jira/browse/SPARK-9591


[cid:image001.png@01D1F6EE.1CCFE110]


Re: how to do nested loops over 2 arrays but use Two RDDs instead ?

2016-08-15 Thread Jörn Franke
Depends on the size of the arrays, but is it what you want to achieve similar 
to a join?

> On 15 Aug 2016, at 20:12, Eric Ho  wrote:
> 
> Hi,
> 
> I've two nested-for loops like this:
> 
> for all elements in Array A do:
> 
> for all elements in Array B do:
> 
> compare a[3] with b[4] see if they 'match' and if match, return that element;
> 
> If I were to represent Arrays A and B as 2 separate RDDs, how would my code 
> look like ? 
> 
> I couldn't find any RDD functions that would do this for me efficiently. I 
> don't really want elements of RDD(A) and RDD(B) flying all over the network 
> piecemeal...
> 
> THanks.
> 
> -- 
> 
> -eric ho
> 


how to do nested loops over 2 arrays but use Two RDDs instead ?

2016-08-15 Thread Eric Ho
Hi,

I've two nested-for loops like this:

*for all elements in Array A do:*

*for all elements in Array B do:*

*compare a[3] with b[4] see if they 'match' and if match, return that
element;*

If I were to represent Arrays A and B as 2 separate RDDs, how would my code
look like ?

I couldn't find any RDD functions that would do this for me efficiently. I
don't really want elements of RDD(A) and RDD(B) flying all over the network
piecemeal...
THanks.

-- 

-eric ho


Sum array values by row in new column

2016-08-15 Thread Javier Rey
Hi everyone,

I have one dataframe with one column this column is an array of numbers,
how can I sum each array by row a obtain a new column with sum? in pyspark.

Example:

++
| numbers|
++
|[10, 20, 30]|
|[40, 50, 60]|
|[70, 80, 90]|
++

The idea is obtain the same df with a new column with totals:

++--
| numbers| |
++--
|[10, 20, 30]|60   |
|[40, 50, 60]|150  |
|[70, 80, 90]|240  |
++--

Regards!

Samir


Re: class not found exception Logging while running JavaKMeansExample

2016-08-15 Thread Ted Yu
Logging has become private in 2.0 release:

private[spark] trait Logging {

On Mon, Aug 15, 2016 at 9:48 AM, subash basnet  wrote:

> Hello all,
>
> I am trying to run JavaKMeansExample of the spark example project. I am
> getting the classnotfound exception error:
> *Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/internal/Logging*
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
> at jcom.dfki.spark.kmeans.KMeansSpark.JavaKMeansExample.
> main(JavaKMeansExample.java:43)
> *Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.internal.Logging*
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> I have added all the logging related dependencies as below:
>  org.slf4j slf4j-api
> ${slf4j.version}  
> org.slf4j slf4j-log4j12
> ${slf4j.version} ${hadoop.deps.scope}
>   org.slf4j
> jul-to-slf4j ${slf4j.version}
>   org.slf4j
> jcl-over-slf4j ${slf4j.version}
> log4j
> log4j ${log4j.version}
>   commons-logging
> commons-logging 1.2
>  What depedencies could I be missing, any idea? Regards,
> Subash Basnet
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


class not found exception Logging while running JavaKMeansExample

2016-08-15 Thread subash basnet
Hello all,

I am trying to run JavaKMeansExample of the spark example project. I am
getting the classnotfound exception error:
*Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/internal/Logging*
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at
jcom.dfki.spark.kmeans.KMeansSpark.JavaKMeansExample.main(JavaKMeansExample.java:43)
*Caused by: java.lang.ClassNotFoundException:
org.apache.spark.internal.Logging*
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

I have added all the logging related dependencies as below:
 org.slf4j
slf4j-api ${slf4j.version}
  org.slf4j
slf4j-log4j12 ${slf4j.version}
${hadoop.deps.scope}  
org.slf4j jul-to-slf4j
${slf4j.version}  
org.slf4j jcl-over-slf4j
${slf4j.version}   
 log4j log4j
${log4j.version}  
commons-logging commons-logging
1.2  What depedencies could I be missing,
any idea? Regards, Subash Basnet
http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;>
	4.0.0

	com.apache.spark
	gettingStarted
	0.0.1-SNAPSHOT
	jar

	gettingStarted
	http://maven.apache.org

	
		UTF-8
		UTF-8
		1.8
		3.3.3
		1.7.16
		1.2.17
		3.4.1
		
		3.2.2
		2.11.8
		2.11
		1.10
		2.4
		

		64m
		512m
		512m
	

	
		
			org.slf4j
			slf4j-api
			${slf4j.version}
			${hadoop.deps.scope}
		
		
			org.slf4j
			slf4j-log4j12
			${slf4j.version}
			${hadoop.deps.scope}
		
		
			org.slf4j
			jul-to-slf4j
			${slf4j.version}
		
		
			org.slf4j
			jcl-over-slf4j
			${slf4j.version}
			 
		
		
			log4j
			log4j
			${log4j.version}
			${hadoop.deps.scope}
		
		
			junit
			junit
			3.8.1
			test
		
		
			org.apache.spark
			spark-core_2.10
			1.6.0
		
		
		
			commons-logging
			commons-logging
			1.2
		
		
			org.apache.spark
			spark-sql_2.10
			2.0.0
		

		
		
			org.apache.spark
			spark-mllib_2.10
			2.0.0
		

		
			org.apache.hbase
			hbase-client
			1.1.3
		
	

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Submitting jobs to YARN from outside EMR -- config & S3 impl

2016-08-15 Thread Everett Anderson
Hi,

We're currently using an EMR cluster (which uses YARN) but submitting Spark
jobs to it using spark-submit from different machines outside the cluster.
We haven't had time to investigate using something like Livy
, yet.

We also have a need to use a mix of cluster and client modes in this
configuration.

Three things we've struggled with here are

   1. Configuring spark-submit with the necessary master node host & ports
   2. Setting up the cluster to support file staging
   3. S3 implementation choices

I'm curious -- how do others handle these?

Here's what we're doing in case it helps anybody --

*Configuring spark-submit *

As far as I can tell, you can't tell spark-submit the YARN resource manager
info on the command-line with --conf properties. You must set a
SPARK_CONF_DIR or HADOOP_CONF_DIR environment variable pointing to a local
directory with core-site.xml, yarn-site.xml, and optionally hive-site.xml.

However, these setting files will override what's on the cluster, so you
have to be careful and try to assemble just what you need (since you might
use differently configured clusters).

A starting point is to start a cluster and grab the files out of
/etc/hadoop/conf and then whittle them down.

*Setting up the cluster to support file staging*

Out of the box, spark-submit will fail when trying to stage files because
the cluster will try to put them in /user/(local user name on the machine
the job was submitted from). That directory and user won't exist on the
cluster.

I think spark.yarn.stagingDir can change the directory, but you seem to
need to setup your cluster with a bootstrap action to create and give fully
open write permissions.

*S3 implementation choices*

Back in the "Role-based S3 access outside of EMR' thread, we talked about
using S3A when running with the local master on an EC2 instance, which
works in Hadoop 2.7+ with the right libraries.

AWS provides their own Hadoop FileSystem implementation for S3 called
EMRFS, and the default EMR cluster setup uses it for "s3://" scheme URIs.
As far as I know, they haven't released this library for use elsewhere. It
supports "consistency view", which uses a DynamoDB to overcome any S3
list-key inconsistency/lag for I/O ops from the cluster. Presumably, also,
they maintain it and its config, and keep them up to date and performing
well.

If you use cluster mode and "s3://" scheme URIs, things work fine.

However, if you use client mode, it seems like Spark will try to use the
Hadoop "s3://" scheme FileSystem on the submitting host for something, and
it will fail because the default implementation won't know the credentials.
One work-around is to set environment variables or Hadoop conf properties
with your secret keys (!).

Another solution is to use the S3A implementation in Hadoop 2.7.x or later.
However, if you use "s3a://" scheme URIs, they'll also be used on the
cluster -- you'll use the S3A implementation for cluster operations instead
of the EMRFS implementation.

Similarly, if you change core-site.xml locally to use the S3A
implementation for "s3://" scheme URIs, that will cause the cluster to also
use the S3A implementation, when it could have used EMRFS.

Haven't figured out how to work around this, yet, or if it's important.


Re: call a mysql stored procedure from spark

2016-08-15 Thread Mich Talebzadeh
Well that is not the best way as you have to wait for RDBMS to process and
populate the temp table.

A more sound way would be to write a shell script to talk to RDBMS first
and creates and populates that table.

Once ready the same shell script can kick off Spark job to read the temp
table which is ready to be mode.

What you are doing a basic ETL and that temp table could be a just staging
table.

The advantage of this method is that you can be sure that data is ready
before opening the JDBC connection in Spark.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 August 2016 at 15:55, sujeet jog  wrote:

> Thanks Michael, Michael,
>
> Ayan
> rightly said, yes this stored procedure is invoked from driver, this
> creates the temporary table is DB, the reason being i want to load some
> specific data after processing it, i do not wish to bring it in spark,
> instead want to keep the processing at DB level,  later once the temp table
> is prepared, i would load it via sparkSQL in the executor to process
> further.
>
>
> On Mon, Aug 15, 2016 at 4:24 AM, ayan guha  wrote:
>
>> More than technical feasibility, I would ask why to invoke a stored
>> procedure for every row? If not, jdbcRdd is moot point.
>>
>> In case stored procedure should be invoked from driver, it can be easily
>> done. Or at most for each partition, at each executor.
>> On 15 Aug 2016 03:06, "Mich Talebzadeh" 
>> wrote:
>>
>>> Hi,
>>>
>>> The link deals with JDBC and states:
>>>
>>> [image: Inline images 1]
>>>
>>> So it is only SQL. It lacks functionality on Stored procedures with
>>> returning result set.
>>>
>>> This is on an Oracle table
>>>
>>> scala>  var _ORACLEserver = "jdbc:oracle:thin:@rhes564:1521:mydb12"
>>> _ORACLEserver: String = jdbc:oracle:thin:@rhes564:1521:mydb12
>>> scala>  var _username = "scratchpad"
>>> _username: String = scratchpad
>>> scala> var _password = "xxx"
>>> _password: String = oracle
>>>
>>> scala> val s = HiveContext.read.format("jdbc").options(
>>>  | Map("url" -> _ORACLEserver,
>>>  | *"dbtable" -> "exec weights_sp",*
>>>  | "user" -> _username,
>>>  | "password" -> _password)).load
>>> java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not
>>> exist
>>>
>>>
>>> and that stored procedure exists in Oracle
>>>
>>> scratch...@mydb12.mich.LOCAL> desc weights_sp
>>> PROCEDURE weights_sp
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 14 August 2016 at 17:42, Michael Armbrust 
>>> wrote:
>>>
 As described here
 ,
 you can use the DataSource API to connect to an external database using
 JDBC.  While the dbtable option is usually just a table name, it can
 also be any valid SQL command that returns a table when enclosed in
 (parentheses).  I'm not certain, but I'd expect you could use this feature
 to invoke a stored procedure and return the results as a DataFrame.

 On Sat, Aug 13, 2016 at 10:40 AM, sujeet jog 
 wrote:

> Hi,
>
> Is there a way to call a stored procedure using spark ?
>
>
> thanks,
> Sujeet
>


>>>
>


Re: call a mysql stored procedure from spark

2016-08-15 Thread sujeet jog
Thanks Michael, Michael,

Ayan
rightly said, yes this stored procedure is invoked from driver, this
creates the temporary table is DB, the reason being i want to load some
specific data after processing it, i do not wish to bring it in spark,
instead want to keep the processing at DB level,  later once the temp table
is prepared, i would load it via sparkSQL in the executor to process
further.


On Mon, Aug 15, 2016 at 4:24 AM, ayan guha  wrote:

> More than technical feasibility, I would ask why to invoke a stored
> procedure for every row? If not, jdbcRdd is moot point.
>
> In case stored procedure should be invoked from driver, it can be easily
> done. Or at most for each partition, at each executor.
> On 15 Aug 2016 03:06, "Mich Talebzadeh"  wrote:
>
>> Hi,
>>
>> The link deals with JDBC and states:
>>
>> [image: Inline images 1]
>>
>> So it is only SQL. It lacks functionality on Stored procedures with
>> returning result set.
>>
>> This is on an Oracle table
>>
>> scala>  var _ORACLEserver = "jdbc:oracle:thin:@rhes564:1521:mydb12"
>> _ORACLEserver: String = jdbc:oracle:thin:@rhes564:1521:mydb12
>> scala>  var _username = "scratchpad"
>> _username: String = scratchpad
>> scala> var _password = "xxx"
>> _password: String = oracle
>>
>> scala> val s = HiveContext.read.format("jdbc").options(
>>  | Map("url" -> _ORACLEserver,
>>  | *"dbtable" -> "exec weights_sp",*
>>  | "user" -> _username,
>>  | "password" -> _password)).load
>> java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist
>>
>>
>> and that stored procedure exists in Oracle
>>
>> scratch...@mydb12.mich.LOCAL> desc weights_sp
>> PROCEDURE weights_sp
>>
>>
>> HTH
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 14 August 2016 at 17:42, Michael Armbrust 
>> wrote:
>>
>>> As described here
>>> ,
>>> you can use the DataSource API to connect to an external database using
>>> JDBC.  While the dbtable option is usually just a table name, it can
>>> also be any valid SQL command that returns a table when enclosed in
>>> (parentheses).  I'm not certain, but I'd expect you could use this feature
>>> to invoke a stored procedure and return the results as a DataFrame.
>>>
>>> On Sat, Aug 13, 2016 at 10:40 AM, sujeet jog 
>>> wrote:
>>>
 Hi,

 Is there a way to call a stored procedure using spark ?


 thanks,
 Sujeet

>>>
>>>
>>


Re: parallel processing with JDBC

2016-08-15 Thread Madabhattula Rajesh Kumar
Hi Mich,

Thank you

Regards,,
Rajesh

On Mon, Aug 15, 2016 at 6:35 PM, Mich Talebzadeh 
wrote:

> Ok Rajesh
>
> This is standalone.
>
> In that case it ought to be at least 4 connections as one executor will
> use one worker.
>
> I am hesitant in here as you can see with (at least) as with Standalone
> mode you may end up with more executors on each worker.
>
> But try it and see whether numPartitions" -> "4" is good or you can
> change this to something higher.
>
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 15 August 2016 at 12:19, Madabhattula Rajesh Kumar  > wrote:
>
>> Hi Mich,
>>
>> Thank you for detailed explanation. One more question
>>
>> In my cluster, I have one master and 4 workers. In this case, 4
>> connections will be opened to Oracle ?
>>
>> Regards,
>> Rajesh
>>
>> On Mon, Aug 15, 2016 at 3:59 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> It happens that the number of parallel processes open from Spark to
>>> RDBMS is determined by the number of executors.
>>>
>>> I just tested this.
>>>
>>> With Yarn client using to executors I see two connections to RDBMS
>>>
>>>
>>> EXECUTIONS USERNAME   SID SERIAL# USERS_EXECUTING SQL_TEXT
>>> -- -- --- --- ---
>>> --
>>>  1 SCRATCHPAD 443   62565   1 SELECT
>>> "RANDOMISED","RANDOM_STRING","PADDING","CLU
>>>
>>> STERED","ID","SCATTERED","SMALL_VC" FROM (SELECT t
>>>   o_char(ID) AS ID,
>>> to_char(CLUSTERED) AS CLUSTERED,
>>>
>>> to_char(SCATTERED) AS SCATTERED, to_char(RANDOMIS
>>>   ED) AS
>>> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDIN
>>>   G FROM
>>> scratchpad.dummy) WHERE ID >= 2301 AND
>>>   ID < 2401
>>>  1 SCRATCHPAD 406   46793   1 SELECT
>>> "RANDOMISED","RANDOM_STRING","PADDING","CLU
>>>
>>> STERED","ID","SCATTERED","SMALL_VC" FROM (SELECT t
>>>   o_char(ID) AS ID,
>>> to_char(CLUSTERED) AS CLUSTERED,
>>>
>>> to_char(SCATTERED) AS SCATTERED, to_char(RANDOMIS
>>>   ED) AS
>>> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDIN
>>>   G FROM
>>> scratchpad.dummy) WHERE ID >= 2401 AND
>>>   ID < 2501
>>>
>>> So it  sounds like (can someone else independently confirm this) that
>>> regardless of what one specifies in "numPartitions" one ends up one
>>> connection from one Spark executor to RDBMS.
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 15 August 2016 at 09:12, Mich Talebzadeh 
>>> wrote:
>>>
 Hi.

 This is a very good question

 I did some tests on this.

 If you are joining two tables then you are creating a result set based
 on some conditions. In this case what I normally do is to specify an ID
 column from either tables and will base my partitioning on that ID column.
 This is pretty straight forward. So bring back your ID column and base you
 lower and upper limit on that ID value

 "partitionColumn" -> "ID",
 "lowerBound" -> "1",
 "upperBound" -> "1",
 "numPartitions" -> "100",



 Also I have noticed that regardless of the number of partitions you
 specify at the RDBMS site, the number of parallel connections will be
 limited and the result set will be partitioned accordingly. For example
 with 

Re: parallel processing with JDBC

2016-08-15 Thread Mich Talebzadeh
Ok Rajesh

This is standalone.

In that case it ought to be at least 4 connections as one executor will use
one worker.

I am hesitant in here as you can see with (at least) as with Standalone
mode you may end up with more executors on each worker.

But try it and see whether numPartitions" -> "4" is good or you can change
this to something higher.


HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 August 2016 at 12:19, Madabhattula Rajesh Kumar 
wrote:

> Hi Mich,
>
> Thank you for detailed explanation. One more question
>
> In my cluster, I have one master and 4 workers. In this case, 4
> connections will be opened to Oracle ?
>
> Regards,
> Rajesh
>
> On Mon, Aug 15, 2016 at 3:59 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> It happens that the number of parallel processes open from Spark to RDBMS
>> is determined by the number of executors.
>>
>> I just tested this.
>>
>> With Yarn client using to executors I see two connections to RDBMS
>>
>>
>> EXECUTIONS USERNAME   SID SERIAL# USERS_EXECUTING SQL_TEXT
>> -- -- --- --- ---
>> --
>>  1 SCRATCHPAD 443   62565   1 SELECT
>> "RANDOMISED","RANDOM_STRING","PADDING","CLU
>>
>> STERED","ID","SCATTERED","SMALL_VC" FROM (SELECT t
>>   o_char(ID) AS ID,
>> to_char(CLUSTERED) AS CLUSTERED,
>>
>> to_char(SCATTERED) AS SCATTERED, to_char(RANDOMIS
>>   ED) AS RANDOMISED,
>> RANDOM_STRING, SMALL_VC, PADDIN
>>   G FROM
>> scratchpad.dummy) WHERE ID >= 2301 AND
>>   ID < 2401
>>  1 SCRATCHPAD 406   46793   1 SELECT
>> "RANDOMISED","RANDOM_STRING","PADDING","CLU
>>
>> STERED","ID","SCATTERED","SMALL_VC" FROM (SELECT t
>>   o_char(ID) AS ID,
>> to_char(CLUSTERED) AS CLUSTERED,
>>
>> to_char(SCATTERED) AS SCATTERED, to_char(RANDOMIS
>>   ED) AS RANDOMISED,
>> RANDOM_STRING, SMALL_VC, PADDIN
>>   G FROM
>> scratchpad.dummy) WHERE ID >= 2401 AND
>>   ID < 2501
>>
>> So it  sounds like (can someone else independently confirm this) that
>> regardless of what one specifies in "numPartitions" one ends up one
>> connection from one Spark executor to RDBMS.
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 15 August 2016 at 09:12, Mich Talebzadeh 
>> wrote:
>>
>>> Hi.
>>>
>>> This is a very good question
>>>
>>> I did some tests on this.
>>>
>>> If you are joining two tables then you are creating a result set based
>>> on some conditions. In this case what I normally do is to specify an ID
>>> column from either tables and will base my partitioning on that ID column.
>>> This is pretty straight forward. So bring back your ID column and base you
>>> lower and upper limit on that ID value
>>>
>>> "partitionColumn" -> "ID",
>>> "lowerBound" -> "1",
>>> "upperBound" -> "1",
>>> "numPartitions" -> "100",
>>>
>>>
>>>
>>> Also I have noticed that regardless of the number of partitions you
>>> specify at the RDBMS site, the number of parallel connections will be
>>> limited and the result set will be partitioned accordingly. For example
>>> with numberPartitions=100, I see only 8 connections in Oracle coming from
>>> Spark connection.
>>>
>>> scala> val s = HiveContext.read.format("jdbc").options(
>>>  | Map("url" -> _ORACLEserver,
>>>  | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
>>> CLUSTERED, to_char(SCATTERED) AS 

Re: Does Spark SQL support indexes?

2016-08-15 Thread Mich Talebzadeh
Brave and wise answer :)



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 August 2016 at 12:24, Gourav Sengupta 
wrote:

> I think that I have scratched a hornet's nest here. If you are comfortable
> mentioning faster way to access data as indexes then its fine. And everyone
> is and in the foreseeable future going to continue to use indexes.
>
> When I think about reaching data faster, I just refer to the methods
> available currently as algorithms.
>
>
> Regards,
> Gourav
>
> On Mon, Aug 15, 2016 at 11:59 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> My two cents
>>
>> Indexes on any form and shape are there to speed up the query whether it
>> is classical index (B-tree), store-index (data and stats stored together),
>> like Oracle Exalytics, SAP Hana, Hive ORC tables or in-memory databases
>> (hash index). Indexes are there to speed up the access path in some form
>> and shape.
>>
>> The issue with indexes on Big data is that HDFS lacks the ability to
>> co-locate blocks, so that is a bit of a challenge and may be one of the
>> reasons that indexes are not as common in Big Data world as others.
>> However, that is changing. Bottom line it sounds like Big Data has to
>> perform on par with a transaction database. in retrieving the queries and
>> very fast access path.
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 15 August 2016 at 11:19, u...@moosheimer.com 
>> wrote:
>>
>>> So you mean HBase, Cassandra, Hana, Elasticsearch and so on do not use
>>> idexes?
>>> There might be some very interesting new concepts I've missed?
>>>
>>> Could you be more precise?
>>>
>>> ;-)
>>>
>>> Regards,
>>> Uwe
>>>
>>>
>>>
>>> Am 15.08.2016 um 11:59 schrieb Gourav Sengupta:
>>>
>>> The world has moved in from indexes, materialized views, and other
>>> single processor non-distributed system algorithms. Nice that you are not
>>> asking questions regarding hierarchical file systems.
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Sun, Aug 14, 2016 at 4:03 AM, Taotao.Li 
>>> wrote:
>>>

 hi, guys, does Spark SQL support indexes?  if so, how can I create an
 index on my temp table? if not, how can I handle some specific queries on a
 very large table? it would iterate all the table even though all I want is
 just a small piece of that table.

 great thanks,


 *___*
 Quant | Engineer | Boy
 *___*
 *blog*:http://litaotao.github.io
 
 *github*: www.github.com/litaotao



>>>
>>>
>>
>


Re: Does Spark SQL support indexes?

2016-08-15 Thread Gourav Sengupta
I think that I have scratched a hornet's nest here. If you are comfortable
mentioning faster way to access data as indexes then its fine. And everyone
is and in the foreseeable future going to continue to use indexes.

When I think about reaching data faster, I just refer to the methods
available currently as algorithms.


Regards,
Gourav

On Mon, Aug 15, 2016 at 11:59 AM, Mich Talebzadeh  wrote:

> My two cents
>
> Indexes on any form and shape are there to speed up the query whether it
> is classical index (B-tree), store-index (data and stats stored together),
> like Oracle Exalytics, SAP Hana, Hive ORC tables or in-memory databases
> (hash index). Indexes are there to speed up the access path in some form
> and shape.
>
> The issue with indexes on Big data is that HDFS lacks the ability to
> co-locate blocks, so that is a bit of a challenge and may be one of the
> reasons that indexes are not as common in Big Data world as others.
> However, that is changing. Bottom line it sounds like Big Data has to
> perform on par with a transaction database. in retrieving the queries and
> very fast access path.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 15 August 2016 at 11:19, u...@moosheimer.com  wrote:
>
>> So you mean HBase, Cassandra, Hana, Elasticsearch and so on do not use
>> idexes?
>> There might be some very interesting new concepts I've missed?
>>
>> Could you be more precise?
>>
>> ;-)
>>
>> Regards,
>> Uwe
>>
>>
>>
>> Am 15.08.2016 um 11:59 schrieb Gourav Sengupta:
>>
>> The world has moved in from indexes, materialized views, and other single
>> processor non-distributed system algorithms. Nice that you are not asking
>> questions regarding hierarchical file systems.
>>
>>
>> Regards,
>> Gourav
>>
>> On Sun, Aug 14, 2016 at 4:03 AM, Taotao.Li 
>> wrote:
>>
>>>
>>> hi, guys, does Spark SQL support indexes?  if so, how can I create an
>>> index on my temp table? if not, how can I handle some specific queries on a
>>> very large table? it would iterate all the table even though all I want is
>>> just a small piece of that table.
>>>
>>> great thanks,
>>>
>>>
>>> *___*
>>> Quant | Engineer | Boy
>>> *___*
>>> *blog*:http://litaotao.github.io
>>> 
>>> *github*: www.github.com/litaotao
>>>
>>>
>>>
>>
>>
>


Re: Accessing HBase through Spark with Security enabled

2016-08-15 Thread Steve Loughran

On 15 Aug 2016, at 08:29, Aneela Saleem 
> wrote:

Thanks Jacek!

I have already set hbase.security.authentication property set to kerberos, 
since Hbase with kerberos is working fine.

I tested again after correcting the typo but got same error. Following is the 
code, Please have a look:

System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
System.setProperty("java.security.auth.login.config", 
"/etc/hbase/conf/zk-jaas.conf");
val hconf = HBaseConfiguration.create()
val tableName = "emp"
hconf.set("hbase.zookeeper.quorum", "hadoop-master")
hconf.set(TableInputFormat.INPUT_TABLE, tableName)
hconf.set("hbase.zookeeper.property.clientPort", "2181")
hconf.set("hbase.master", "hadoop-master:6")
hconf.set("hadoop.security.authentication", "kerberos")
hconf.addResource(new Path("/etc/hbase/conf/core-site.xml"))
hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"))

spark should be automatically picking those up from the classpath; adding them 
to your  own hconf isn't going to have any effect on the hbase config used to 
extract the hbase token on Yarn app launch. That all needs to be set up at the 
time the Spark cluster/app is launched. If you are running

There's a little diagnostics tool, kdiag, which will be in future Hadoop 
versions —It's available as a standalone JAR for others to use

https://github.com/steveloughran/kdiag

This may help verify things like your keytab/login details


val conf = new SparkConf()
conf.set("spark.yarn.security.tokens.hbase.enabled", "true")
conf.set("spark.authenticate", "true")
conf.set("spark.authenticate.secret","None")
val sc = new SparkContext(conf)
val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

val count = hBaseRDD.count()
print("HBase RDD count:" + count)



On Sat, Aug 13, 2016 at 8:36 PM, Jacek Laskowski 
> wrote:
Hi Aneela,

My (little to no) understanding of how to make it work is to use
hbase.security.authentication property set to kerberos (see [1]).


Nobody understands kerberos; you are not alone. And the more you understand of 
Kerberos, the less you want to.

Spark on YARN uses it to get the tokens for Hive, HBase et al (see
[2]). It happens when Client starts conversation to YARN RM (see [3]).

You should not do that yourself (and BTW you've got a typo in
spark.yarn.security.tokens.habse.enabled setting). I think that the
entire code you pasted matches the code Spark's doing itself before
requesting resources from YARN.

Give it a shot and report back since I've never worked in such a
configuration and would love improving in this (security) area.
Thanks!

[1] 
http://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_sg_hbase_authentication.html#concept_zyz_vg5_nt__section_s1l_nwv_ls
[2] 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala#L58
[3] 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L396



[2] is the code from last week; SPARK-14743. The predecessor code was pretty 
similar though: make an RPC call to HBase to ask for an HBase delegation token 
to be handed off to the YARN app; it requires the use to be Kerberos 
authenticated first.


Pozdrawiam,
Jacek Laskowski

>> > 2016-08-07 20:43:57,617 WARN
>> > [hconnection-0x24b5fa45-metaLookup-shared--pool2-t1] ipc.RpcClientImpl:
>> > Exception encountered while connecting to the server :
>> > javax.security.sasl.SaslException: GSS initiate failed [Caused by
>> > GSSException: No valid credentials provided (Mechanism level: Failed to
>> > find
>> > any Kerberos tgt)]
>> > 2016-08-07 20:43:57,619 ERROR
>> > [hconnection-0x24b5fa45-metaLookup-shared--pool2-t1] ipc.RpcClientImpl:
>> > SASL
>> > authentication failed. The most likely cause is missing or invalid
>> > credentials. Consider 'kinit'.
>> > javax.security.sasl.SaslException: GSS initiate failed [Caused by
>> > GSSException: No valid credentials provided (Mechanism level: Failed to
>> > find
>> > any Kerberos tgt)]
>> >   at
>> >
>> > com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>> >   at
>> >
>> > org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
>> >   at
>> >
>> > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:617)
>> >   at
>> >
>> > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$700(RpcClientImpl.java:162)
>> >   at
>> >
>> > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:743)
>> >   at
>> >
>> > org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:740)
>> >   at java.security.AccessController.doPrivileged(Native Method)
>> >  

Re: parallel processing with JDBC

2016-08-15 Thread Madabhattula Rajesh Kumar
Hi Mich,

Thank you for detailed explanation. One more question

In my cluster, I have one master and 4 workers. In this case, 4 connections
will be opened to Oracle ?

Regards,
Rajesh

On Mon, Aug 15, 2016 at 3:59 PM, Mich Talebzadeh 
wrote:

> It happens that the number of parallel processes open from Spark to RDBMS
> is determined by the number of executors.
>
> I just tested this.
>
> With Yarn client using to executors I see two connections to RDBMS
>
>
> EXECUTIONS USERNAME   SID SERIAL# USERS_EXECUTING SQL_TEXT
> -- -- --- --- ---
> --
>  1 SCRATCHPAD 443   62565   1 SELECT
> "RANDOMISED","RANDOM_STRING","PADDING","CLU
>
> STERED","ID","SCATTERED","SMALL_VC" FROM (SELECT t
>   o_char(ID) AS ID,
> to_char(CLUSTERED) AS CLUSTERED,
>to_char(SCATTERED)
> AS SCATTERED, to_char(RANDOMIS
>   ED) AS RANDOMISED,
> RANDOM_STRING, SMALL_VC, PADDIN
>   G FROM
> scratchpad.dummy) WHERE ID >= 2301 AND
>   ID < 2401
>  1 SCRATCHPAD 406   46793   1 SELECT
> "RANDOMISED","RANDOM_STRING","PADDING","CLU
>
> STERED","ID","SCATTERED","SMALL_VC" FROM (SELECT t
>   o_char(ID) AS ID,
> to_char(CLUSTERED) AS CLUSTERED,
>to_char(SCATTERED)
> AS SCATTERED, to_char(RANDOMIS
>   ED) AS RANDOMISED,
> RANDOM_STRING, SMALL_VC, PADDIN
>   G FROM
> scratchpad.dummy) WHERE ID >= 2401 AND
>   ID < 2501
>
> So it  sounds like (can someone else independently confirm this) that
> regardless of what one specifies in "numPartitions" one ends up one
> connection from one Spark executor to RDBMS.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 15 August 2016 at 09:12, Mich Talebzadeh 
> wrote:
>
>> Hi.
>>
>> This is a very good question
>>
>> I did some tests on this.
>>
>> If you are joining two tables then you are creating a result set based on
>> some conditions. In this case what I normally do is to specify an ID column
>> from either tables and will base my partitioning on that ID column. This is
>> pretty straight forward. So bring back your ID column and base you lower
>> and upper limit on that ID value
>>
>> "partitionColumn" -> "ID",
>> "lowerBound" -> "1",
>> "upperBound" -> "1",
>> "numPartitions" -> "100",
>>
>>
>>
>> Also I have noticed that regardless of the number of partitions you
>> specify at the RDBMS site, the number of parallel connections will be
>> limited and the result set will be partitioned accordingly. For example
>> with numberPartitions=100, I see only 8 connections in Oracle coming from
>> Spark connection.
>>
>> scala> val s = HiveContext.read.format("jdbc").options(
>>  | Map("url" -> _ORACLEserver,
>>  | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
>> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
>> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>>  | "partitionColumn" -> "ID",
>>  | "lowerBound" -> "1",
>>  | "upperBound" -> "1",
>>  | "numPartitions" -> "100",
>>  | "user" -> _username,
>>  | "password" -> _password)).load
>> s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
>> more fields]
>> scala> s.toJavaRDD.partitions.size()
>> res1: Int = 100
>>
>> This also seems to set the number of partitions. I still think that the
>> emphasis has to be on getting data from RDBMS as quickly as possible. The
>> partitioning does work. In below the login scratchpad has multiple
>> connections to Oracle and does the range selection OK
>>
>>  1 SCRATCHPAD  45   43048   1 SELECT
>> "SMALL_VC","CLUSTERED","PADDING","RANDOM_ST
>>
>> RING","ID","SCATTERED","RANDOMISED" FROM (SELECT t
>>   o_char(ID) AS ID,
>> to_char(CLUSTERED) AS 

Re: Does Spark SQL support indexes?

2016-08-15 Thread Mich Talebzadeh
My two cents

Indexes on any form and shape are there to speed up the query whether it is
classical index (B-tree), store-index (data and stats stored together),
like Oracle Exalytics, SAP Hana, Hive ORC tables or in-memory databases
(hash index). Indexes are there to speed up the access path in some form
and shape.

The issue with indexes on Big data is that HDFS lacks the ability to
co-locate blocks, so that is a bit of a challenge and may be one of the
reasons that indexes are not as common in Big Data world as others.
However, that is changing. Bottom line it sounds like Big Data has to
perform on par with a transaction database. in retrieving the queries and
very fast access path.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 August 2016 at 11:19, u...@moosheimer.com  wrote:

> So you mean HBase, Cassandra, Hana, Elasticsearch and so on do not use
> idexes?
> There might be some very interesting new concepts I've missed?
>
> Could you be more precise?
>
> ;-)
>
> Regards,
> Uwe
>
>
>
> Am 15.08.2016 um 11:59 schrieb Gourav Sengupta:
>
> The world has moved in from indexes, materialized views, and other single
> processor non-distributed system algorithms. Nice that you are not asking
> questions regarding hierarchical file systems.
>
>
> Regards,
> Gourav
>
> On Sun, Aug 14, 2016 at 4:03 AM, Taotao.Li 
> wrote:
>
>>
>> hi, guys, does Spark SQL support indexes?  if so, how can I create an
>> index on my temp table? if not, how can I handle some specific queries on a
>> very large table? it would iterate all the table even though all I want is
>> just a small piece of that table.
>>
>> great thanks,
>>
>>
>> *___*
>> Quant | Engineer | Boy
>> *___*
>> *blog*:http://litaotao.github.io
>> 
>> *github*: www.github.com/litaotao
>>
>>
>>
>
>


Linear regression, weights constraint

2016-08-15 Thread letaiv
Hi all, 

Is there any approach to add constrain for weights in linear regression?
What I need is least squares regression with non-negative constraints on the
coefficients/weights. 

Thanks in advance. 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Linear-regression-weights-constraint-tp27535.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: parallel processing with JDBC

2016-08-15 Thread Mich Talebzadeh
It happens that the number of parallel processes open from Spark to RDBMS
is determined by the number of executors.

I just tested this.

With Yarn client using to executors I see two connections to RDBMS


EXECUTIONS USERNAME   SID SERIAL# USERS_EXECUTING SQL_TEXT
-- -- --- --- ---
--
 1 SCRATCHPAD 443   62565   1 SELECT
"RANDOMISED","RANDOM_STRING","PADDING","CLU

STERED","ID","SCATTERED","SMALL_VC" FROM (SELECT t
  o_char(ID) AS ID,
to_char(CLUSTERED) AS CLUSTERED,
   to_char(SCATTERED)
AS SCATTERED, to_char(RANDOMIS
  ED) AS RANDOMISED,
RANDOM_STRING, SMALL_VC, PADDIN
  G FROM
scratchpad.dummy) WHERE ID >= 2301 AND
  ID < 2401
 1 SCRATCHPAD 406   46793   1 SELECT
"RANDOMISED","RANDOM_STRING","PADDING","CLU

STERED","ID","SCATTERED","SMALL_VC" FROM (SELECT t
  o_char(ID) AS ID,
to_char(CLUSTERED) AS CLUSTERED,
   to_char(SCATTERED)
AS SCATTERED, to_char(RANDOMIS
  ED) AS RANDOMISED,
RANDOM_STRING, SMALL_VC, PADDIN
  G FROM
scratchpad.dummy) WHERE ID >= 2401 AND
  ID < 2501

So it  sounds like (can someone else independently confirm this) that
regardless of what one specifies in "numPartitions" one ends up one
connection from one Spark executor to RDBMS.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 August 2016 at 09:12, Mich Talebzadeh 
wrote:

> Hi.
>
> This is a very good question
>
> I did some tests on this.
>
> If you are joining two tables then you are creating a result set based on
> some conditions. In this case what I normally do is to specify an ID column
> from either tables and will base my partitioning on that ID column. This is
> pretty straight forward. So bring back your ID column and base you lower
> and upper limit on that ID value
>
> "partitionColumn" -> "ID",
> "lowerBound" -> "1",
> "upperBound" -> "1",
> "numPartitions" -> "100",
>
>
>
> Also I have noticed that regardless of the number of partitions you
> specify at the RDBMS site, the number of parallel connections will be
> limited and the result set will be partitioned accordingly. For example
> with numberPartitions=100, I see only 8 connections in Oracle coming from
> Spark connection.
>
> scala> val s = HiveContext.read.format("jdbc").options(
>  | Map("url" -> _ORACLEserver,
>  | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>  | "partitionColumn" -> "ID",
>  | "lowerBound" -> "1",
>  | "upperBound" -> "1",
>  | "numPartitions" -> "100",
>  | "user" -> _username,
>  | "password" -> _password)).load
> s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
> more fields]
> scala> s.toJavaRDD.partitions.size()
> res1: Int = 100
>
> This also seems to set the number of partitions. I still think that the
> emphasis has to be on getting data from RDBMS as quickly as possible. The
> partitioning does work. In below the login scratchpad has multiple
> connections to Oracle and does the range selection OK
>
>  1 SCRATCHPAD  45   43048   1 SELECT
> "SMALL_VC","CLUSTERED","PADDING","RANDOM_ST
>
> RING","ID","SCATTERED","RANDOMISED" FROM (SELECT t
>   o_char(ID) AS ID,
> to_char(CLUSTERED) AS CLUSTERED,
>to_char(SCATTERED)
> AS SCATTERED, to_char(RANDOMIS
>   ED) AS RANDOMISED,
> RANDOM_STRING, SMALL_VC, PADDIN
>   G FROM
> scratchpad.dummy)
> *WHERE ID >= 1601
> AND  ID < 1701*
>
> HTH
>
>
>
>
>
>
>
> Dr 

Re: Does Spark SQL support indexes?

2016-08-15 Thread u...@moosheimer.com
So you mean HBase, Cassandra, Hana, Elasticsearch and so on do not use
idexes?
There might be some very interesting new concepts I've missed?

Could you be more precise?

;-)

Regards,
Uwe


Am 15.08.2016 um 11:59 schrieb Gourav Sengupta:
> The world has moved in from indexes, materialized views, and other
> single processor non-distributed system algorithms. Nice that you are
> not asking questions regarding hierarchical file systems.
>
>
> Regards,
> Gourav 
>
> On Sun, Aug 14, 2016 at 4:03 AM, Taotao.Li  > wrote:
>
>
> hi, guys, does Spark SQL support indexes?  if so, how can I create
> an index on my temp table? if not, how can I handle some specific
> queries on a very large table? it would iterate all the table even
> though all I want is just a small piece of that table.
>
> great thanks, 
>
>
> *___*
> Quant | Engineer | Boy
> *___*
> */blog/*:http://litaotao.github.io
> 
> */github/*: www.github.com/litaotao 
>
>
>



RE: Does Spark SQL support indexes?

2016-08-15 Thread Ashic Mahtab
Guess the good people in the Cassandra world are stuck in the past making 
indexes, materialized views, etc. better with every release :)

From: mich.talebza...@gmail.com
Date: Mon, 15 Aug 2016 11:11:03 +0100
Subject: Re: Does Spark SQL support indexes?
To: gourav.sengu...@gmail.com
CC: charles.up...@gmail.com; user@spark.apache.org

Are you sure about that Gourav :)



Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction
of data or any other property which may arise from relying on this email's 
technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  



On 15 August 2016 at 10:59, Gourav Sengupta  wrote:
The world has moved in from indexes, materialized views, and other single 
processor non-distributed system algorithms. Nice that you are not asking 
questions regarding hierarchical file systems.

Regards,Gourav 
On Sun, Aug 14, 2016 at 4:03 AM, Taotao.Li  wrote:
hi, guys, does Spark SQL support indexes?  if so, how can I create an index on 
my temp table? if not, how can I handle some specific queries on a very large 
table? it would iterate all the table even though all I want is just a small 
piece of that table.
great thanks, 

___Quant | Engineer | Boy
___blog:http://litaotao.github.iogithub: 
www.github.com/litaotao






  

Re: Does Spark SQL support indexes?

2016-08-15 Thread Mich Talebzadeh
Are you sure about that Gourav :)



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 August 2016 at 10:59, Gourav Sengupta 
wrote:

> The world has moved in from indexes, materialized views, and other single
> processor non-distributed system algorithms. Nice that you are not asking
> questions regarding hierarchical file systems.
>
>
> Regards,
> Gourav
>
> On Sun, Aug 14, 2016 at 4:03 AM, Taotao.Li 
> wrote:
>
>>
>> hi, guys, does Spark SQL support indexes?  if so, how can I create an
>> index on my temp table? if not, how can I handle some specific queries on a
>> very large table? it would iterate all the table even though all I want is
>> just a small piece of that table.
>>
>> great thanks,
>>
>>
>> *___*
>> Quant | Engineer | Boy
>> *___*
>> *blog*:http://litaotao.github.io
>> 
>> *github*: www.github.com/litaotao
>>
>>
>>
>


Re: Does Spark SQL support indexes?

2016-08-15 Thread Gourav Sengupta
The world has moved in from indexes, materialized views, and other single
processor non-distributed system algorithms. Nice that you are not asking
questions regarding hierarchical file systems.


Regards,
Gourav

On Sun, Aug 14, 2016 at 4:03 AM, Taotao.Li  wrote:

>
> hi, guys, does Spark SQL support indexes?  if so, how can I create an
> index on my temp table? if not, how can I handle some specific queries on a
> very large table? it would iterate all the table even though all I want is
> just a small piece of that table.
>
> great thanks,
>
>
> *___*
> Quant | Engineer | Boy
> *___*
> *blog*:http://litaotao.github.io
> 
> *github*: www.github.com/litaotao
>
>
>


RE: Simulate serialization when running local

2016-08-15 Thread Ashic Mahtab
Thanks Miguel...will have a read.
Thanks Jacek...that looks incredibly useful.

:)

Subject: Re: Simulate serialization when running local
From: mig...@zero-x.co
Date: Sun, 14 Aug 2016 21:07:41 -0700
CC: as...@live.com; user@spark.apache.org
To: ja...@japila.pl

Hi Ashic,
Absolutely.  Serialization errors can be caught locally by taking a test driven 
approach.  I have written a blog post about this as I believe it's important to 
develop spark applications this way.  
If you're interested you can find my post at 
https://medium.com/@therevoltingx/test-driven-development-w-apache-spark-746082b44941#.egnvmicyb
Thanks 

Sent from my iPhone
On Aug 14, 2016, at 6:07 PM, Jacek Laskowski  wrote:

Hi Ashic,

Yes, there is one - local-cluster[N, cores, memory] - that you can use
for simulating a Spark cluster of [N, cores, memory] locally.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2478

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Aug 10, 2016 at 10:24 AM, Ashic Mahtab  wrote:
Hi,
Is there a way to simulate "networked" spark when running local (i.e.
master=local[4])? Ideally, some setting that'll ensure any "Task not
serializable" errors are caught during local testing? I seem to vaguely
remember something, but am having trouble pinpointing it.

Cheers,
Ashic.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

  

Re: spark ml : auc on extreme distributed data

2016-08-15 Thread Sean Owen
Class imbalance can be an issue for algorithms, but decision forests
should in general cope reasonably well with imbalanced classes. By
default, positive and negative classes are treated 'equally' however,
and that may not reflect reality in some cases. Upsampling the
under-represented case is a crude but effective way to counter this.

Of course the model depends on the data distribution, but it also
depends on the data, of course. And the ROC curve depends on the model
and data. There is no inherent relationship between the class balance
and ROC curve though.

AUC for a random-guessing classifier should be ~0.5. 0.8 is generally
good. I could believe that this doesn't change much just because you
changed parameters or representation.

This isn't really a Spark question per se so you might get some other
answers on the Data Science or Stats StackExchange.

On Mon, Aug 15, 2016 at 5:11 AM, Zhiliang Zhu
 wrote:
> Hi All,
>
> Here I have lot of data with around 1,000,000 rows, 97% of them are negative
> class and 3% of them are positive class .
> I applied Random Forest algorithm to build the model and predict the testing
> data.
>
> For the data preparation,
> i. firstly randomly split all the data as training data and testing data by
> 0.7 : 0.3
> ii. let the testing data unchanged, its negative and positive class ratio
> would still be 97:3
> iii. try to make the training data negative and positive class ratio as
> 50:50, by way of sample algorithm in the different classes
> iv. get RF model by training data and predict testing data
>
> by modifying algorithm parameters and feature work (PCA etc ), it seems that
> the auc on the testing data is always above 0.8, or much more higher ...
>
> Then I lose into some confusion... It seems that the model or auc depends a
> lot on the original data distribution...
> In effect, I would like to know, for this data distribution, how its auc
> would be for random guess?
> What the auc would be for any kind of data distribution?
>
> Thanks in advance~~

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Can not find usage of classTag variable defined in abstract class AtomicType in spark project

2016-08-15 Thread Andy Zhao
When I read spark source code, I found an abstract class AtomicType. It's
defined like this:

protected[sql] abstract class AtomicType extends DataType {
  private[sql] type InternalType
  private[sql] val tag: TypeTag[InternalType]
  private[sql] val ordering: Ordering[InternalType]

  @transient private[sql] val classTag = ScalaReflectionLock.synchronized {
  val mirror = runtimeMirror(Utils.getSparkClassLoader)
  ClassTag[InternalType](mirror.runtimeClass(tag.tpe))
 }
}
I understand that classTag is used to get runtime type information of type
member InternelType, but I can not find how this classTag is used in
subclasses of AtomicType. And I also cannot find any reference to this
variable in spark project.

Can anyone tell me how this classTag is used in spark project?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-find-usage-of-classTag-variable-defined-in-abstract-class-AtomicType-in-spark-project-tp27534.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: parallel processing with JDBC

2016-08-15 Thread Mich Talebzadeh
Hi.

This is a very good question

I did some tests on this.

If you are joining two tables then you are creating a result set based on
some conditions. In this case what I normally do is to specify an ID column
from either tables and will base my partitioning on that ID column. This is
pretty straight forward. So bring back your ID column and base you lower
and upper limit on that ID value

"partitionColumn" -> "ID",
"lowerBound" -> "1",
"upperBound" -> "1",
"numPartitions" -> "100",



Also I have noticed that regardless of the number of partitions you specify
at the RDBMS site, the number of parallel connections will be limited and
the result set will be partitioned accordingly. For example with
numberPartitions=100, I see only 8 connections in Oracle coming from Spark
connection.

scala> val s = HiveContext.read.format("jdbc").options(
 | Map("url" -> _ORACLEserver,
 | "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
 | "partitionColumn" -> "ID",
 | "lowerBound" -> "1",
 | "upperBound" -> "1",
 | "numPartitions" -> "100",
 | "user" -> _username,
 | "password" -> _password)).load
s: org.apache.spark.sql.DataFrame = [ID: string, CLUSTERED: string ... 5
more fields]
scala> s.toJavaRDD.partitions.size()
res1: Int = 100

This also seems to set the number of partitions. I still think that the
emphasis has to be on getting data from RDBMS as quickly as possible. The
partitioning does work. In below the login scratchpad has multiple
connections to Oracle and does the range selection OK

 1 SCRATCHPAD  45   43048   1 SELECT
"SMALL_VC","CLUSTERED","PADDING","RANDOM_ST

RING","ID","SCATTERED","RANDOMISED" FROM (SELECT t
  o_char(ID) AS ID,
to_char(CLUSTERED) AS CLUSTERED,
   to_char(SCATTERED)
AS SCATTERED, to_char(RANDOMIS
  ED) AS RANDOMISED,
RANDOM_STRING, SMALL_VC, PADDIN
  G FROM
scratchpad.dummy)
*WHERE ID >= 1601
AND  ID < 1701*

HTH







Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 August 2016 at 08:18, ayan guha  wrote:

> Hi
>
> I would suggest you to look at sqoop as well. Essentially, you can provide
> a splitBy/partitionBy column using which data will be distributed among
> your stated number of mappers
>
> On Mon, Aug 15, 2016 at 5:07 PM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi Mich,
>>
>> I have a below question.
>>
>> I want to join two tables and return the result based on the input value.
>> In this case, how we need to specify lower bound and upper bound values ?
>>
>> select t1.id, t1.name, t2.course, t2.qualification from t1, t2 where
>> t1.transactionid=*1* and t1.id = t2.id
>>
>> *1 => dynamic input value.*
>>
>> Regards,
>> Rajesh
>>
>> On Mon, Aug 15, 2016 at 12:05 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> If you have your RDBMS table partitioned, then you need to consider how
>>> much data you want to extract in other words the result set returned by the
>>> JDBC call.
>>>
>>> If you want all the data, then the number of partitions specified in the
>>> JDBC call should be equal to the number of partitions in your RDBMS table.
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 14 August 2016 at 21:44, Ashok Kumar  wrote:
>>>
 Thank you very much sir.

 I forgot to mention that two of these Oracle tables are range
 partitioned. In that case what would be the 

Re: Accessing HBase through Spark with Security enabled

2016-08-15 Thread Aneela Saleem
Thanks Jacek!

I have already set hbase.security.authentication property set to kerberos,
since Hbase with kerberos is working fine.

I tested again after correcting the typo but got same error. Following is
the code, Please have a look:

System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
System.setProperty("java.security.auth.login.config",
"/etc/hbase/conf/zk-jaas.conf");
val hconf = HBaseConfiguration.create()
val tableName = "emp"
hconf.set("hbase.zookeeper.quorum", "hadoop-master")
hconf.set(TableInputFormat.INPUT_TABLE, tableName)
hconf.set("hbase.zookeeper.property.clientPort", "2181")
hconf.set("hbase.master", "hadoop-master:6")
hconf.set("hadoop.security.authentication", "kerberos")
hconf.addResource(new Path("/etc/hbase/conf/core-site.xml"))
hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"))
val conf = new SparkConf()
conf.set("spark.yarn.security.tokens.hbase.enabled", "true")
conf.set("spark.authenticate", "true")
conf.set("spark.authenticate.secret","None")
val sc = new SparkContext(conf)
val hBaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

val count = hBaseRDD.count()
print("HBase RDD count:" + count)

On Sat, Aug 13, 2016 at 8:36 PM, Jacek Laskowski  wrote:

> Hi Aneela,
>
> My (little to no) understanding of how to make it work is to use
> hbase.security.authentication property set to kerberos (see [1]).
>
> Spark on YARN uses it to get the tokens for Hive, HBase et al (see
> [2]). It happens when Client starts conversation to YARN RM (see [3]).
>
> You should not do that yourself (and BTW you've got a typo in
> spark.yarn.security.tokens.habse.enabled setting). I think that the
> entire code you pasted matches the code Spark's doing itself before
> requesting resources from YARN.
>
> Give it a shot and report back since I've never worked in such a
> configuration and would love improving in this (security) area.
> Thanks!
>
> [1] http://www.cloudera.com/documentation/enterprise/5-5-
> x/topics/cdh_sg_hbase_authentication.html#concept_
> zyz_vg5_nt__section_s1l_nwv_ls
> [2] https://github.com/apache/spark/blob/master/yarn/src/
> main/scala/org/apache/spark/deploy/yarn/security/
> HBaseCredentialProvider.scala#L58
> [3] https://github.com/apache/spark/blob/master/yarn/src/
> main/scala/org/apache/spark/deploy/yarn/Client.scala#L396
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Fri, Aug 12, 2016 at 11:30 PM, Aneela Saleem 
> wrote:
> > Thanks for your response Jacek!
> >
> > Here is the code, how spark accesses HBase:
> > System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
> > System.setProperty("java.security.auth.login.config",
> > "/etc/hbase/conf/zk-jaas.conf");
> > val hconf = HBaseConfiguration.create()
> > val tableName = "emp"
> > hconf.set("hbase.zookeeper.quorum", "hadoop-master")
> > hconf.set(TableInputFormat.INPUT_TABLE, tableName)
> > hconf.set("hbase.zookeeper.property.clientPort", "2181")
> > hconf.set("hbase.master", "hadoop-master:6")
> > hconf.set("hadoop.security.authentication", "kerberos")
> > hconf.set("hbase.security.authentication", "kerberos")
> > hconf.addResource(new Path("/etc/hbase/conf/core-site.xml"))
> > hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"))
> > UserGroupInformation.setConfiguration(hconf)
> > UserGroupInformation.loginUserFromKeytab("spark@platalyticsrealm",
> > "/etc/hadoop/conf/sp.keytab")
> > conf.set("spark.yarn.security.tokens.habse.enabled", "true")
> > conf.set("hadoop.security.authentication", "true")
> > conf.set("hbase.security.authentication", "true")
> > conf.set("spark.authenticate", "true")
> > conf.set("spark.authenticate.secret","None")
> > val sc = new SparkContext(conf)
> > UserGroupInformation.setConfiguration(hconf)
> > val keyTab = "/etc/hadoop/conf/sp.keytab"
> > val ugi =
> > UserGroupInformation.loginUserFromKeytabAndReturnUG
> I("spark/hadoop-master@platalyticsrealm",
> > keyTab)
> > UserGroupInformation.setLoginUser(ugi)
> > HBaseAdmin.checkHBaseAvailable(hconf);
> > ugi.doAs(new PrivilegedExceptionAction[Void]() {
> > override def run(): Void = {
> > val conf = new SparkConf().set("spark.shuffle.consolidateFiles", "true")
> >
> > val sc = new SparkContext(conf)
> > val hbaseContext = new HBaseContext(sc, hconf)
> >
> > val scan = new Scan()
> > scan.addColumn(columnName, "column1")
> > scan.setTimeRange(0L, 141608330L)
> > val rdd = hbaseContext.hbaseRDD("emp", scan)
> > println(rdd.count)
> > rdd.saveAsTextFile("hdfs://hadoop-master:8020/hbaseTemp/")
> > sc.stop()
> > return null
> > }
> > })
> > I have tried it with both Spark versions, 20 and 1.5.3 but same exception
> > was thrown.
> >
> > I floated this email on HBase 

Re: parallel processing with JDBC

2016-08-15 Thread ayan guha
Hi

I would suggest you to look at sqoop as well. Essentially, you can provide
a splitBy/partitionBy column using which data will be distributed among
your stated number of mappers

On Mon, Aug 15, 2016 at 5:07 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi Mich,
>
> I have a below question.
>
> I want to join two tables and return the result based on the input value.
> In this case, how we need to specify lower bound and upper bound values ?
>
> select t1.id, t1.name, t2.course, t2.qualification from t1, t2 where
> t1.transactionid=*1* and t1.id = t2.id
>
> *1 => dynamic input value.*
>
> Regards,
> Rajesh
>
> On Mon, Aug 15, 2016 at 12:05 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> If you have your RDBMS table partitioned, then you need to consider how
>> much data you want to extract in other words the result set returned by the
>> JDBC call.
>>
>> If you want all the data, then the number of partitions specified in the
>> JDBC call should be equal to the number of partitions in your RDBMS table.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 14 August 2016 at 21:44, Ashok Kumar  wrote:
>>
>>> Thank you very much sir.
>>>
>>> I forgot to mention that two of these Oracle tables are range
>>> partitioned. In that case what would be the optimum number of partitions if
>>> you can share?
>>>
>>> Warmest
>>>
>>>
>>> On Sunday, 14 August 2016, 21:37, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>
>>> If you have primary keys on these tables then you can parallelise the
>>> process reading data.
>>>
>>> You have to be careful not to set the number of partitions too many.
>>> Certainly there is a balance between the number of partitions supplied to
>>> JDBC and the load on the network and the source DB.
>>>
>>> Assuming that your underlying table has primary key ID, then this will
>>> create 20 parallel processes to Oracle DB
>>>
>>>  val d = HiveContext.read.format("jdbc").options(
>>>  Map("url" -> _ORACLEserver,
>>>  "dbtable" -> "(SELECT , , FROM )",
>>>  "partitionColumn" -> "ID",
>>>  "lowerBound" -> "1",
>>>  "upperBound" -> "maxID",
>>>  "numPartitions" -> "20",
>>>  "user" -> _username,
>>>  "password" -> _password)).load
>>>
>>> assuming your upper bound on ID is maxID
>>>
>>>
>>> This will open multiple connections to RDBMS, each getting a subset of
>>> data that you want.
>>>
>>> You need to test it to ensure that you get the numPartitions optimum and
>>> you don't overload any component.
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>> On 14 August 2016 at 21:15, Ashok Kumar 
>>> wrote:
>>>
>>> Hi,
>>>
>>> There are 4 tables ranging from 10 million to 100 million rows but they
>>> all have primary keys.
>>>
>>> The network is fine but our Oracle is RAC and we can only connect to a
>>> designated Oracle node (where we have a DQ account only).
>>>
>>> We have a limited time window of few hours to get the required data out.
>>>
>>> Thanks
>>>
>>>
>>> On Sunday, 14 August 2016, 21:07, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>
>>> How big are your tables and is there any issue with the network between
>>> your Spark nodes and your Oracle DB that adds to issues?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>> LinkedIn * https://www.linkedin.com/ profile/view?id=
>>> AAEWh2gBxianrbJd6zP6AcPCCd OABUrV8Pw
>>> *
>>>
>>> http://talebzadehmich. wordpress.com
>>> 
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is 

Re: parallel processing with JDBC

2016-08-15 Thread Madabhattula Rajesh Kumar
Hi Mich,

I have a below question.

I want to join two tables and return the result based on the input value.
In this case, how we need to specify lower bound and upper bound values ?

select t1.id, t1.name, t2.course, t2.qualification from t1, t2 where
t1.transactionid=*1* and t1.id = t2.id

*1 => dynamic input value.*

Regards,
Rajesh

On Mon, Aug 15, 2016 at 12:05 PM, Mich Talebzadeh  wrote:

> If you have your RDBMS table partitioned, then you need to consider how
> much data you want to extract in other words the result set returned by the
> JDBC call.
>
> If you want all the data, then the number of partitions specified in the
> JDBC call should be equal to the number of partitions in your RDBMS table.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 14 August 2016 at 21:44, Ashok Kumar  wrote:
>
>> Thank you very much sir.
>>
>> I forgot to mention that two of these Oracle tables are range
>> partitioned. In that case what would be the optimum number of partitions if
>> you can share?
>>
>> Warmest
>>
>>
>> On Sunday, 14 August 2016, 21:37, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>
>> If you have primary keys on these tables then you can parallelise the
>> process reading data.
>>
>> You have to be careful not to set the number of partitions too many.
>> Certainly there is a balance between the number of partitions supplied to
>> JDBC and the load on the network and the source DB.
>>
>> Assuming that your underlying table has primary key ID, then this will
>> create 20 parallel processes to Oracle DB
>>
>>  val d = HiveContext.read.format("jdbc").options(
>>  Map("url" -> _ORACLEserver,
>>  "dbtable" -> "(SELECT , , FROM )",
>>  "partitionColumn" -> "ID",
>>  "lowerBound" -> "1",
>>  "upperBound" -> "maxID",
>>  "numPartitions" -> "20",
>>  "user" -> _username,
>>  "password" -> _password)).load
>>
>> assuming your upper bound on ID is maxID
>>
>>
>> This will open multiple connections to RDBMS, each getting a subset of
>> data that you want.
>>
>> You need to test it to ensure that you get the numPartitions optimum and
>> you don't overload any component.
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>> On 14 August 2016 at 21:15, Ashok Kumar 
>> wrote:
>>
>> Hi,
>>
>> There are 4 tables ranging from 10 million to 100 million rows but they
>> all have primary keys.
>>
>> The network is fine but our Oracle is RAC and we can only connect to a
>> designated Oracle node (where we have a DQ account only).
>>
>> We have a limited time window of few hours to get the required data out.
>>
>> Thanks
>>
>>
>> On Sunday, 14 August 2016, 21:07, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>
>> How big are your tables and is there any issue with the network between
>> your Spark nodes and your Oracle DB that adds to issues?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn * https://www.linkedin.com/ profile/view?id=
>> AAEWh2gBxianrbJd6zP6AcPCCd OABUrV8Pw
>> *
>>
>> http://talebzadehmich. wordpress.com
>> 
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>> On 14 August 2016 at 20:50, Ashok Kumar 
>> wrote:
>>
>> Hi Gurus,
>>
>> I have few large tables in rdbms (ours is Oracle). We want to access
>> these tables through Spark JDBC
>>
>> What is the quickest way of getting data into Spark Dataframe say
>> 

Re: parallel processing with JDBC

2016-08-15 Thread Mich Talebzadeh
If you have your RDBMS table partitioned, then you need to consider how
much data you want to extract in other words the result set returned by the
JDBC call.

If you want all the data, then the number of partitions specified in the
JDBC call should be equal to the number of partitions in your RDBMS table.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 14 August 2016 at 21:44, Ashok Kumar  wrote:

> Thank you very much sir.
>
> I forgot to mention that two of these Oracle tables are range partitioned.
> In that case what would be the optimum number of partitions if you can
> share?
>
> Warmest
>
>
> On Sunday, 14 August 2016, 21:37, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> If you have primary keys on these tables then you can parallelise the
> process reading data.
>
> You have to be careful not to set the number of partitions too many.
> Certainly there is a balance between the number of partitions supplied to
> JDBC and the load on the network and the source DB.
>
> Assuming that your underlying table has primary key ID, then this will
> create 20 parallel processes to Oracle DB
>
>  val d = HiveContext.read.format("jdbc").options(
>  Map("url" -> _ORACLEserver,
>  "dbtable" -> "(SELECT , , FROM )",
>  "partitionColumn" -> "ID",
>  "lowerBound" -> "1",
>  "upperBound" -> "maxID",
>  "numPartitions" -> "20",
>  "user" -> _username,
>  "password" -> _password)).load
>
> assuming your upper bound on ID is maxID
>
>
> This will open multiple connections to RDBMS, each getting a subset of
> data that you want.
>
> You need to test it to ensure that you get the numPartitions optimum and
> you don't overload any component.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
> On 14 August 2016 at 21:15, Ashok Kumar 
> wrote:
>
> Hi,
>
> There are 4 tables ranging from 10 million to 100 million rows but they
> all have primary keys.
>
> The network is fine but our Oracle is RAC and we can only connect to a
> designated Oracle node (where we have a DQ account only).
>
> We have a limited time window of few hours to get the required data out.
>
> Thanks
>
>
> On Sunday, 14 August 2016, 21:07, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> How big are your tables and is there any issue with the network between
> your Spark nodes and your Oracle DB that adds to issues?
>
> HTH
>
> Dr Mich Talebzadeh
>
> LinkedIn * https://www.linkedin.com/ profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCd OABUrV8Pw
> *
>
> http://talebzadehmich. wordpress.com
> 
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
> On 14 August 2016 at 20:50, Ashok Kumar 
> wrote:
>
> Hi Gurus,
>
> I have few large tables in rdbms (ours is Oracle). We want to access these
> tables through Spark JDBC
>
> What is the quickest way of getting data into Spark Dataframe say multiple
> connections from Spark
>
> thanking you
>
>
>
>
>
>
>
>
>