Re: how to use cluster sparkSession like localSession

2018-11-01 Thread Arbab Khalil
remove master configuration from code and then submit it to any cluster, it
should work.

On Fri, Nov 2, 2018 at 10:52 AM 崔苗(数据与人工智能产品开发部) <0049003...@znv.com> wrote:

>
> then how about spark sql and spark MLlib , we use them at most time
> 0049003208
> 0049003...@znv.com
>
> 
> 签名由 网易邮箱大师  定制
> On 11/2/2018 11:58,Daniel de Oliveira
> Mantovani
>  wrote:
>
> Please, read about Spark Streaming or Spark Structured Streaming. Your web
> application can easily communicate through some API and you won’t have the
> overhead of start a new spark job, which is pretty heavy.
>
> On Thu, Nov 1, 2018 at 23:01 崔苗(数据与人工智能产品开发部) <0049003...@znv.com> wrote:
>
>>
>> Hi,
>> we want to execute spark code with out submit application.jar,like this
>> code:
>>
>> public static void main(String args[]) throws Exception{
>> SparkSession spark = SparkSession
>> .builder()
>> .master("local[*]")
>> .appName("spark test")
>> .getOrCreate();
>>
>> Dataset testData =
>> spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
>> testData.printSchema();
>> testData.show();
>> }
>>
>> the above code can work well with idea , do not need to generate jar file
>> and submit , but if we replace master("local[*]") with master("yarn") ,
>> it can't work , so is there a way to use cluster sparkSession like local
>> sparkSession ?  we need to dynamically execute spark code in web server
>> according to the different request ,  such as filter request will call
>> dataset.filter() , so there is no application.jar to submit .
>>
>> 0049003208
>> 0049003...@znv.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
>
> --
> Daniel de Oliveira Mantovani
> Perl Evangelist/Data Hacker
> +1 786 459 1341
>
>

-- 
Regards,
Arbab Khalil
Software Design Engineer


Re: how to use cluster sparkSession like localSession

2018-11-01 Thread 数据与人工智能产品开发部












then how about spark sql and spark MLlib , we use them at most time


 










0049003208




0049003...@znv.com








签名由
网易邮箱大师
定制

 

On 11/2/2018 11:58,Daniel de Oliveira Mantovani wrote: 


Please, read about Spark Streaming or Spark Structured Streaming. Your web application can easily communicate through some API and you won’t have the overhead of start a new spark job, which is pretty heavy.On Thu, Nov 1, 2018 at 23:01 崔苗(数据与人工智能产品开发部) <0049003...@znv.com> wrote:












Hi,
we want to execute spark code with out submit application.jar,like this code:public static void main(String args[]) throws Exception{

        SparkSession spark = SparkSession
                .builder()
                .master("local[*]")
                .appName("spark test")
                .getOrCreate();
              Dataset testData = spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
        testData.printSchema();
        testData.show();
    }the above code can work well with idea , do not need to generate jar file and submit , but if we replace master("local[*]") with master("yarn") , it can't work , so is there a way to use cluster sparkSession like local sparkSession ?  we need to dynamically execute spark code in web server according to the different request ,  such as filter request will call dataset.filter() , so there is no application.jar to submit . 

 










0049003208




0049003...@znv.com








签名由
网易邮箱大师
定制

 




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-- --Daniel de Oliveira MantovaniPerl Evangelist/Data Hacker+1 786 459 1341






Re: how to use cluster sparkSession like localSession

2018-11-01 Thread Daniel de Oliveira Mantovani
Please, read about Spark Streaming or Spark Structured Streaming. Your web
application can easily communicate through some API and you won’t have the
overhead of start a new spark job, which is pretty heavy.

On Thu, Nov 1, 2018 at 23:01 崔苗(数据与人工智能产品开发部) <0049003...@znv.com> wrote:

>
> Hi,
> we want to execute spark code with out submit application.jar,like this
> code:
>
> public static void main(String args[]) throws Exception{
> SparkSession spark = SparkSession
> .builder()
> .master("local[*]")
> .appName("spark test")
> .getOrCreate();
>
> Dataset testData =
> spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
> testData.printSchema();
> testData.show();
> }
>
> the above code can work well with idea , do not need to generate jar file
> and submit , but if we replace master("local[*]") with master("yarn") ,
> it can't work , so is there a way to use cluster sparkSession like local
> sparkSession ?  we need to dynamically execute spark code in web server
> according to the different request ,  such as filter request will call
> dataset.filter() , so there is no application.jar to submit .
>
> 0049003208
> 0049003...@znv.com
>
> 
> 签名由 网易邮箱大师  定制
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org

-- 

--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341


how to use cluster sparkSession like localSession

2018-11-01 Thread 数据与人工智能产品开发部











Hi,
we want to execute spark code with out submit application.jar,like this code:public static void main(String args[]) throws Exception{

        SparkSession spark = SparkSession
                .builder()
                .master("local[*]")
                .appName("spark test")
                .getOrCreate();
              Dataset testData = spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
        testData.printSchema();
        testData.show();
    }the above code can work well with idea , do not need to generate jar file and submit , but if we replace master("local[*]") with master("yarn") , it can't work , so is there a way to use cluster sparkSession like local sparkSession ?  we need to dynamically execute spark code in web server according to the different request ,  such as filter request will call dataset.filter() , so there is no application.jar to submit . 

 










0049003208




0049003...@znv.com








签名由
网易邮箱大师
定制

 




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


[PySpark Profiler]: Does empty profile mean no execution in Python Interpreter?

2018-11-01 Thread Alex

Hi,

I ran into an interesting scenario with no profile output today. I have 
a PySpark application that primarily uses the Spark SQL APIs. I 
understand that parts of the Spark SQL API may not generate data in the 
PySpark profile dumps, but I was surprised when I had code containing a 
UDF that did not generate any profile output. I had thought anytime I 
used a UDF with Spark SQL that code would have to execute in a Python 
interpreter on the executor. Is that not the case? This went against my 
mental model for how this works in Spark, so I'm trying to understand 
what is happening here to cause no profile output, which made me wonder 
if the UDF had ran in the JVM.


I have created a github repo with this code in main.py and the example 
code in ticket 3478 https://github.com/apache/spark/pull/2556 in 
py_profile.py which does emit a profile dump.


https://github.com/AlexHagerman/pyspark-profiling

Thanks,

Alex


from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import broadcast, udf
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.linalg import Vector, VectorUDT

if __name__ == '__main__':

spark = SparkSession.builder.appName("token_to_vec") \
.config("spark.python.profile", "true") \
.config("spark.python.profile.dump", "./main_dump/") \
.config("spark.rdd.compress", "true") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer", "64") \
.getOrCreate()

lines_df = spark.read.parquet("./data/token.parquet")

vecs = Word2VecModel.load('./data/word_vectors')
vecs_df = vecs.getVectors()
vecs_dict = vecs_df.collect()

vec_dict = spark.sparkContext.broadcast({wv[0]: wv[1] for wv in vecs_dict})
missing_vec = spark.sparkContext.broadcast(vec_dict.value['MISSING_TOKEN'])

token_to_vec = udf(lambda r: [vec_dict.value.get(w, missing_vec.value) for 
w in r], ArrayType(VectorUDT()))

tdf = lines_df.withColumn("ln_vec", token_to_vec("text"))

tdf.write.mode("overwrite").parquet(path="./data/token_vecs.parquet", 
mode="overwrite", compression="snappy")

spark.sparkContext.show_profiles()
spark.stop()





Would Spark can read file from S3 which are Client-Side Encrypted KMS–Managed Customer Master Key (CMK) ?

2018-11-01 Thread mytramesh


I able to read s3 files which are Server-Side Encryption(SSE-KMS). Added
KMSId to IAM role and able to read seamlessly . 
Recently I am receiving S3 files which are Client-Side Encrypted ( AWS
KMS–Managed Customer Master Key (CMK)) , when I try to read these files  i
am seeing count is 0. 

To view content of this file manually I followed two step process. 

1. download with KMSID
2. Decrypt with below command 

Ex:- aws-encryption-cli --decrypt
 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



StackOverflowError for simple map (not to incubator mailing list)

2018-11-01 Thread Chris Olivier
(Sorry, first one sent to incubator maling list which probably doesn't come
here)

Hi, I have been stuck at this for a week.

I have a relatively simple dataframe like this:

+-+-++---+
| item|  item_id|  target|  start|
+-+-++---+
|sensor123|sensor123|[0.005683, 0.0070...|2008-01-01 00:00:00|
|sensor249|sensor249|[0.009783, 0.0068...|2008-01-01 00:00:00|
|sensor379|sensor379|[0.001917, 0.0016...|2008-01-01 00:00:00|
| sensor52| sensor52|[0.016267, 0.0121...|2008-01-01 00:00:00|

target us a WrappedArray[Double]

This simple code runs on local spark but has stack overflow error on
EMR Spark.  I've tried playing with paritioning with no effect.

```scala
def transform(dataset: Dataset[_]): DataFrame = {
var ds:Dataset[_] = dataset.
var df:DataFrame = ds.toDF

val sparkSession = dataset.sparkSession
import sparkSession.implicits._

val itemIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_ITEM, df)
val startIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_START, df)
val targetIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_TARGET, df)

val result = df.map(r => {
val itemName = r.getAs[String](itemIndex)
val start = r.getAs[Timestamp](startIndex)
val targetArray = r.getAs[mutable.WrappedArray[Double]](targetIndex)
(itemName, start, targetArray)
})
result.show

// Get the schema ready
val schema = StructType(
Seq(
StructField(DataSources.FIELD_ITEM, StringType, true,
Metadata.empty),
StructField(DataSources.FIELD_START, TimestampType, true,
Metadata.empty),
StructField(DataSources.FIELD_TARGET,
DataTypes.createArrayType(DoubleType), true, Metadata.empty)
)
)

var nn = dataset.sqlContext.createDataFrame(result.toDF.rdd, schema)
nn.show
nn
}

```

Error is like:

Exception in thread "main" java.lang.StackOverflowError
at 
scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:65)
at scala.StringContext.standardInterpolator(StringContext.scala:123)
at scala.StringContext.s(StringContext.scala:95)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.freshName(CodeGenerator.scala:565)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
at scala.collection.immutable.Stream.drop(Stream.scala:858)
at scala.collection.immutable.Stream.drop(Stream.scala:202)
at 
scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64)
at scala.collection.immutable.Stream.apply(Stream.scala:202)
at 
org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:62)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
at scala.collection.immutable.Stream.drop(Stream.scala:858)
at scala.collection.immutable.Stream.drop(Stream.scala:202)
at 
scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64)
at scala.collection.immutable.Stream.apply(Stream.scala:202)
at 
org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:62)

StackOverflowError for simple map

2018-11-01 Thread Chris Olivier
Hi, I have been stuck at this for a week.

I have a relatively simple dataframe like this:

+-+-++---+
| item|  item_id|  target|  start|
+-+-++---+
|sensor123|sensor123|[0.005683, 0.0070...|2008-01-01 00:00:00|
|sensor249|sensor249|[0.009783, 0.0068...|2008-01-01 00:00:00|
|sensor379|sensor379|[0.001917, 0.0016...|2008-01-01 00:00:00|
| sensor52| sensor52|[0.016267, 0.0121...|2008-01-01 00:00:00|

target us a WrappedArray[Double]

This simple code runs on local spark but has stack overflow error on
EMR Spark.  I've tried playing with paritioning with no effect.

```scala
def transform(dataset: Dataset[_]): DataFrame = {
var ds:Dataset[_] = dataset.
var df:DataFrame = ds.toDF

val sparkSession = dataset.sparkSession
import sparkSession.implicits._

val itemIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_ITEM, df)
val startIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_START, df)
val targetIndex:Int =
SparkJavaUtils.getColumnIndex(DataSources.FIELD_TARGET, df)

val result = df.map(r => {
val itemName = r.getAs[String](itemIndex)
val start = r.getAs[Timestamp](startIndex)
val targetArray = r.getAs[mutable.WrappedArray[Double]](targetIndex)
(itemName, start, targetArray)
})
result.show

// Get the schema ready
val schema = StructType(
Seq(
StructField(DataSources.FIELD_ITEM, StringType, true,
Metadata.empty),
StructField(DataSources.FIELD_START, TimestampType, true,
Metadata.empty),
StructField(DataSources.FIELD_TARGET,
DataTypes.createArrayType(DoubleType), true, Metadata.empty)
)
)

var nn = dataset.sqlContext.createDataFrame(result.toDF.rdd, schema)
nn.show
nn
}

```

Error is like:

Exception in thread "main" java.lang.StackOverflowError
at 
scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:65)
at scala.StringContext.standardInterpolator(StringContext.scala:123)
at scala.StringContext.s(StringContext.scala:95)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.freshName(CodeGenerator.scala:565)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
at scala.collection.immutable.Stream.drop(Stream.scala:858)
at scala.collection.immutable.Stream.drop(Stream.scala:202)
at 
scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64)
at scala.collection.immutable.Stream.apply(Stream.scala:202)
at 
org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:62)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
at scala.collection.immutable.Stream.drop(Stream.scala:858)
at scala.collection.immutable.Stream.drop(Stream.scala:202)
at 
scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64)
at scala.collection.immutable.Stream.apply(Stream.scala:202)
at 
org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:62)
at 

Re: Apache Spark orc read performance when reading large number of small files

2018-11-01 Thread gpatcham
When I run spark.read.orc("hdfs://test").filter("conv_date = 20181025").count
with "spark.sql.orc.filterPushdown=true" I see below in executors logs.
Predicate push down is happening

18/11/01 17:31:17 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 =
(IS_NULL conv_date)
leaf-1 = (EQUALS conv_date 20181025)
expr = (and (not leaf-0) leaf-1)


But when I run hive query in spark I see below logs

Hive table: Hive

spark.sql("select * from test where conv_date = 20181025").count

18/11/01 17:37:57 INFO HadoopRDD: Input split: hdfs://test/test1.orc:0+34568
18/11/01 17:37:57 INFO OrcRawRecordMerger: min key = null, max key = null
18/11/01 17:37:57 INFO ReaderImpl: Reading ORC rows from
hdfs://test/test1.orc with {include: [true, false, false, false, true,
false, false, false, false, false, false, false, false, false, false, false,
false, false, false, false, false, false, false, false, false, false, false,
false, false, false, false, false, false, false, false, false, false, false,
false, false, false, false, false, false, false, false, false, false, false,
false, false, false, false, false, false, false, false, false, false, false,
false, false, false], offset: 0, length: 9223372036854775807}
18/11/01 17:37:57 INFO Executor: Finished task 224.0 in stage 0.0 (TID 33).
1662 bytes result sent to driver
18/11/01 17:37:57 INFO CoarseGrainedExecutorBackend: Got assigned task 40
18/11/01 17:37:57 INFO Executor: Running task 956.0 in stage 0.0 (TID 40)





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: use spark cluster in java web service

2018-11-01 Thread hemant singh
Why do't you explore Livy. You can use the Rest API to submit the jobs -
https://community.hortonworks.com/articles/151164/how-to-submit-spark-application-through-livy-rest.html



On Thu, Nov 1, 2018 at 12:52 PM 崔苗(数据与人工智能产品开发部) <0049003...@znv.com> wrote:

> Hi,
> we want to use spark in our java web service , compute data in spark
> cluster according to request,now we have two probles:
> 1、 how to get sparkSession of remote spark cluster (spark on yarn mode) ,
> we want to keep one sparkSession to execute all data compution;
> 2、how to submit to remote spark cluster in java code instead of
> spark-submit , as we want to execute spark code in reponse server;
>
> Thanks for any replys
> 0049003208
> 0049003...@znv.com
>
> 
> 签名由 网易邮箱大师  定制
>


Fwd: use spark cluster in java web service

2018-11-01 Thread onmstester onmstester
Refer: https://spark.apache.org/docs/latest/quick-start.html 1. Create a 
singleton SparkContext at initialization of your cluster, the spark-context or 
spark-sql would be accessible through a static method anywhere in your 
application. I recommend using Fair scheduling on your context, to share 
resources among all input requests SparkSession spark = 
SparkSession.builder().appName("Simple Application").getOrCreate();
 2. From now on, with sc or spark-sql object, something like 
sparkSql.sql("select * from test").collectAsList() would be run as a spark job 
and returns result to your application Sent using Zoho Mail  
Forwarded message  From : 崔苗(数据与人工智能产品开发部) <0049003...@znv.com> To 
: "user" Date : Thu, 01 Nov 2018 10:52:15 +0330 Subject 
: use spark cluster in java web service  Forwarded message 
 Hi, we want to use spark in our java web service , compute data in 
spark cluster according to request,now we have two probles: 1、 how to get 
sparkSession of remote spark cluster (spark on yarn mode) , we want to keep one 
sparkSession to execute all data compution; 2、how to submit to remote spark 
cluster in java code instead of spark-submit , as we want to execute spark code 
in reponse server; Thanks for any replys 0049003208 0049003...@znv.com 签名由 
网易邮箱大师 定制 - 
To unsubscribe e-mail:user-unsubscr...@spark.apache.org

Re: SIGBUS (0xa) when using DataFrameWriter.insertInto

2018-11-01 Thread alexzautke
Reported as SPARK-25907, if anyone is still interested.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



use spark cluster in java web service

2018-11-01 Thread 数据与人工智能产品开发部







Hi,
we want to use spark in our java web service , compute data in spark cluster according to request,now we have two probles:1、 how to get sparkSession of remote spark cluster (spark on yarn mode) , we want to keep one sparkSession to execute all data compution;2、how to submit to remote spark cluster in java code instead of spark-submit , as we want to execute spark code in reponse server;Thanks for any replys


 










0049003208




0049003...@znv.com








签名由
网易邮箱大师
定制

 





Re: Apache Spark orc read performance when reading large number of small files

2018-11-01 Thread Jörn Franke
A lot of small files is very inefficient itself and predicate push down will 
not help you much there unless you merge them into one large file (one large 
file can be much more efficiently processed).

How did you validate that predicate pushdown did not work on Hive? You Hive 
Version is also very old - consider upgrading to at least Hive 2.x

> Am 31.10.2018 um 20:35 schrieb gpatcham :
> 
> spark version 2.2.0
> Hive version 1.1.0
> 
> There are lot of small files
> 
> Spark code :
> 
> "spark.sql.orc.enabled": "true",
> "spark.sql.orc.filterPushdown": "true 
> 
> val logs
> =spark.read.schema(schema).orc("hdfs://test/date=201810").filter("date >
> 20181003")
> 
> Hive:
> 
> "spark.sql.orc.enabled": "true",
> "spark.sql.orc.filterPushdown": "true 
> 
> test  table in Hive is pointing to hdfs://test/  and partitioned on date
> 
> val sqlStr = s"select * from test where date > 20181001"
> val logs = spark.sql(sqlStr)
> 
> With Hive query I don't see filter pushdown is  happening. I tried setting
> these configs in both hive-site.xml and also spark.sqlContext.setConf
> 
> "hive.optimize.ppd":"true",
> "hive.optimize.ppd.storage":"true" 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to use Dataset forEachPartion and groupByKey together

2018-11-01 Thread Kuttaiah Robin
Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.


Step 1.
Read events from Kafka as shown below;
--
   Dataset kafkaEvents = getSparkSession().readStream().format("kafka")
  .option("kafka.bootstrap.servers", strKafkaAddress)
  .option("assign", strSubscription)
  .option("maxOffsetsPerTrigger", "10")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", false)
  .load()
  .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
  .select("events.*");

I do groupByKey and then for each group, use those set of events obtained
per group, create JDBC connection/preparedStatement, insert and then close
connection.
Am using Oracle JDBC along with flatMapGroupsWithState.


Step 2.
Groupby and flatMapGroupwithState
-
Dataset  sessionUpdates = kafkaEvents
   .groupByKey(
  new MapFunction() {
@Override public String call(Row event) {
  return event.getAs(m_InsightRawEvent.getPrimaryKey());
}
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


This has a drawback where it creates connection, inserts into DB for each
group.

I need to do it for each partition so that only one connection and one
bacth insert can be done for all the new events which is read from the
partition.

Can somebody point me on how I can achieve this?

Basically am looking below;
1. Read from kafka as said above.
2. kafkaEvents.forEachPartion - Create one connection here.
3. Groupby and flatMapGroupwithState

thanks
Robin Kuttaiah