Re: pySpark - pandas UDF and binaryType

2019-05-03 Thread Gourav Sengupta
And also be aware that pandas UDF does not always lead to better
performance and sometimes even massively slow performance.

With Grouped Map dont you run into the risk of random memory errors as well?

On Thu, May 2, 2019 at 9:32 PM Bryan Cutler  wrote:

> Hi,
>
> BinaryType support was not added until Spark 2.4.0, see
> https://issues.apache.org/jira/browse/SPARK-23555. Also, pyarrow 0.10.0
> or greater is require as you saw in the docs.
>
> Bryan
>
> On Thu, May 2, 2019 at 4:26 AM Nicolas Paris 
> wrote:
>
>> Hi all
>>
>> I am using pySpark 2.3.0 and pyArrow 0.10.0
>>
>> I want to apply a pandas-udf on a dataframe with 
>> I have the bellow error:
>>
>> > Invalid returnType with grouped map Pandas UDFs:
>> >
>> StructType(List(StructField(filename,StringType,true),StructField(contents,BinaryType,true)))
>> > is not supported
>>
>>
>> I am missing something ?
>> the doc
>> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types
>> says pyArrow 0.10 is minimum to handle binaryType
>>
>> here is the code:
>>
>> > from pyspark.sql.functions import pandas_udf, PandasUDFType
>> >
>> > df = sql("select filename, contents from test_binary")
>> >
>> > @pandas_udf("filename String, contents binary",
>> PandasUDFType.GROUPED_MAP)
>> > def transform_binary(pdf):
>> > contents = pdf.contents
>> > return pdf.assign(contents=contents)
>> >
>> > df.groupby("filename").apply(transform_binary).count()
>>
>> Thanks
>> --
>> nicolas
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Howto force spark to honor parquet partitioning

2019-05-03 Thread Gourav Sengupta
so you want data from one physical partition in the disk to go to only one
executor?

On Fri, May 3, 2019 at 5:38 PM Tomas Bartalos 
wrote:

> Hello,
>
> I have partitioned parquet files based on "event_hour" column.
> After reading parquet files to spark:
> spark.read.format("parquet").load("...")
> Files from the same parquet partition are scattered in many spark
> partitions.
>
> Example of mapping spark partition -> parquet partition:
>
> Spark partition 1 -> 2019050101, 2019050102, 2019050103
> Spark partition 2 -> 2019050101, 2019050103, 2019050104
> ...
> Spark partition 20 -> 2019050101, ...
> Spark partition 21 -> 2019050101, ...
>
> As you can see parquet partition 2019050101 is present in Spark partition
> 1, 2, 20, 21.
> As a result when I write out the dataFrame:
> df.write.partitionBy("event_hour").format("parquet").save("...")
>
>  There are many files created in one parquet partition (In case of our
> example its 4 files, but in reality its much more)
> To speed up queries, my goal is to write 1 file per parquet partition (1
> file per hour).
>
> So far my only solution is to use repartition:
> df.repartition(col("event_hour"))
>
> But there is a lot of overhead with unnecessary shuffle. I'd like to force
> spark to "pickup" the parquet partitioning.
>
> In my investigation I've found
> org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD
> 
> where the initial partitioning is happening based on file sizes. There is
> an explicit ordering which causes parquet partition shuffle.
>
> thank you for your help,
> Tomas
>


Re: Spark SQL JDBC teradata syntax error

2019-05-03 Thread Gourav Sengupta
What is the query

On Fri, May 3, 2019 at 5:28 PM KhajaAsmath Mohammed 
wrote:

> Hi
>
> I have followed link
> https://community.teradata.com/t5/Connectivity/Teradata-JDBC-Driver-returns-the-wrong-schema-column-nullability/m-p/77824
>  to
> connect teradata from spark.
>
> I was able to print schema if I give table name instead of sql query.
>
> I am getting below error if I give query(code snippet from above link).
> any help is appreciated?
>
> Exception in thread "main" java.sql.SQLException: [Teradata Database]
> [TeraJDBC 16.20.00.10] [Error 3707] [SQLState 42000] Syntax error, expected
> something like an 'EXCEPT' keyword or an 'UNION' keyword or a 'MINUS'
> keyword between the word 'VEHP91_BOM' and '?'.
> at
> com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:309)
> at
> com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:103)
> at
> com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:311)
> at
> com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:200)
> at
> com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:137)
> at
> com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:128)
> at
> com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:389)
> at
> com.teradata.jdbc.jdbc_4.TDStatement.prepareRequest(TDStatement.java:576)
> at
> com.teradata.jdbc.jdbc_4.TDPreparedStatement.(TDPreparedStatement.java:131)
> at
> com.teradata.jdbc.jdk6.JDK6_SQL_PreparedStatement.(JDK6_SQL_PreparedStatement.java:30)
> at
> com.teradata.jdbc.jdk6.JDK6_SQL_Connection.constructPreparedStatement(JDK6_SQL_Connection.java:82)
> at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1337)
> at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1381)
> at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1367)
>
>
> Thanks,
> Asmath
>


[MLlib][Beginner][Debug]: Logistic Regression model always predicts the same value

2019-05-03 Thread Josue Lopes
So this is my first time using Apache Spark and machine learning in general
and i'm currently trying to create a small application to detect credit
card fraud.

Currently I have about 1 transaction objects i'm using for my data set
with 70% of it going towards training the model and 30% for testing.

I'm using a Logistic Regression model with features being the amount spent,
types of merchants, the card number, total amount spent in the last 24
hours and the time since the last transaction. I have one label for the
fraud probability where 0 equals a valid transaction and 1 equals a
fraudulent one.

Currently the model never predicts fraud in any situation. I think the fact
that I have a very skewed data set might be affecting it as currently only
10% of my data represent fraudulent transactions. I tried to use the
classWeight column to give more weight to the minority class in order to
try and work around this but I haven't been successful so far. If I adjust
the classWeight too much, it eventually only starts predicting fraud which
is not correct either. Ideally I would have a higher threshold as well and
have tried both lower and higher regularization to see if it would make any
differences as well.


Howto force spark to honor parquet partitioning

2019-05-03 Thread Tomas Bartalos
Hello,

I have partitioned parquet files based on "event_hour" column.
After reading parquet files to spark:
spark.read.format("parquet").load("...")
Files from the same parquet partition are scattered in many spark
partitions.

Example of mapping spark partition -> parquet partition:

Spark partition 1 -> 2019050101, 2019050102, 2019050103
Spark partition 2 -> 2019050101, 2019050103, 2019050104
...
Spark partition 20 -> 2019050101, ...
Spark partition 21 -> 2019050101, ...

As you can see parquet partition 2019050101 is present in Spark partition
1, 2, 20, 21.
As a result when I write out the dataFrame:
df.write.partitionBy("event_hour").format("parquet").save("...")

 There are many files created in one parquet partition (In case of our
example its 4 files, but in reality its much more)
To speed up queries, my goal is to write 1 file per parquet partition (1
file per hour).

So far my only solution is to use repartition:
df.repartition(col("event_hour"))

But there is a lot of overhead with unnecessary shuffle. I'd like to force
spark to "pickup" the parquet partitioning.

In my investigation I've found
org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD

where the initial partitioning is happening based on file sizes. There is
an explicit ordering which causes parquet partition shuffle.

thank you for your help,
Tomas


Spark SQL JDBC teradata syntax error

2019-05-03 Thread KhajaAsmath Mohammed
Hi

I have followed link
https://community.teradata.com/t5/Connectivity/Teradata-JDBC-Driver-returns-the-wrong-schema-column-nullability/m-p/77824
to
connect teradata from spark.

I was able to print schema if I give table name instead of sql query.

I am getting below error if I give query(code snippet from above link). any
help is appreciated?

Exception in thread "main" java.sql.SQLException: [Teradata Database]
[TeraJDBC 16.20.00.10] [Error 3707] [SQLState 42000] Syntax error, expected
something like an 'EXCEPT' keyword or an 'UNION' keyword or a 'MINUS'
keyword between the word 'VEHP91_BOM' and '?'.
at
com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:309)
at
com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:103)
at
com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:311)
at
com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:200)
at
com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:137)
at
com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:128)
at
com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:389)
at com.teradata.jdbc.jdbc_4.TDStatement.prepareRequest(TDStatement.java:576)
at
com.teradata.jdbc.jdbc_4.TDPreparedStatement.(TDPreparedStatement.java:131)
at
com.teradata.jdbc.jdk6.JDK6_SQL_PreparedStatement.(JDK6_SQL_PreparedStatement.java:30)
at
com.teradata.jdbc.jdk6.JDK6_SQL_Connection.constructPreparedStatement(JDK6_SQL_Connection.java:82)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1337)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1381)
at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1367)


Thanks,
Asmath


Re: Spark SQL Teradata load is very slow

2019-05-03 Thread Shyam P
Asmath,
Why upperBound is set to 300 ? how many cores you have ?
  check how data is distributed in TeraData DB table.
SELECT  distinct( itm_bloon_seq_no  ), count(*) as cc  FROM TABLE   order
by  itm_bloon_seq_no  desc;

Is this column "itm_bloon_seq_no" already in table or you derived at spark
code side ?

Thanks,
Shyam


On Thu, May 2, 2019 at 11:30 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I have teradata table who has more than 2.5 billion records and data size
> is around 600 GB. I am not able to pull efficiently using spark SQL and it
> is been running for more than 11 hours. here is my code.
>
>   val df2 = sparkSession.read.format("jdbc")
> .option("url", "jdbc:teradata://PROD/DATABASE=101")
> .option("user", "HDFS_TD")
> .option("password", "C")
> .option("dbtable", "")
> .option("numPartitions", partitions)
> .option("driver","com.teradata.jdbc.TeraDriver")
> .option("partitionColumn", "itm_bloon_seq_no")
> .option("lowerBound", config.getInt("lowerBound"))
> .option("upperBound", config.getInt("upperBound"))
>
> Lower bound is 0 and upperbound is 300. Spark is using multiple executors
> but most of the executors are running fast and few executors are taking
> more time may be due to shuffling or something else.
>
> I also tried repartition on column but no luck. is there a better way to
> load this fast?
>
> Table in teradata is view but not the table.
>
> Thanks,
> Asmath
>
>


Re: Update / Delete records in Parquet

2019-05-03 Thread Chetan Khatri
Agreed with delta.io, I am exploring both options

On Wed, May 1, 2019 at 2:50 PM Vitaliy Pisarev 
wrote:

> Ankit, you should take a look at delta.io that was recently open sourced
> by databricks.
>
> Full DML support is on the way.
>
>
>
> *From: *"Khare, Ankit" 
> *Date: *Tuesday, 23 April 2019 at 11:35
> *To: *Chetan Khatri , Jason Nerothin <
> jasonnerot...@gmail.com>
> *Cc: *user 
> *Subject: *Re: Update / Delete records in Parquet
>
>
>
> Hi Chetan,
>
>
>
> I also agree that for this usecase parquet would not be the best option .
> I had similar usecase ,
>
>
>
> 50 different tables to be download from MSSQL .
>
>
>
> Source : MSSQL
>
> Destination. : Apache KUDU (Since it supports very well change data
> capture use cases)
>
>
>
> We used Streamset CDC module to connect to MSSQL and then get CDC data to
> Apache KUDU
>
>
>
> Total records. : 3 B
>
>
>
> Thanks
>
> Ankit
>
>
>
>
>
> *From: *Chetan Khatri 
> *Date: *Tuesday, 23. April 2019 at 05:58
> *To: *Jason Nerothin 
> *Cc: *user 
> *Subject: *Re: Update / Delete records in Parquet
>
>
>
> Hello Jason, Thank you for reply. My use case is that, first time I do
> full load and transformation/aggregation/joins and write to parquet (as
> staging) but next time onwards my source is MSSQL Server, I want to pull
> only those records got changed / updated and would like to update at
> parquet also if possible without side effects.
>
>
> https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/work-with-change-tracking-sql-server?view=sql-server-2017
>
>
>
> On Tue, Apr 23, 2019 at 3:02 AM Jason Nerothin 
> wrote:
>
> Hi Chetan,
>
>
>
> Do you have to use Parquet?
>
>
>
> It just feels like it might be the wrong sink for a high-frequency change
> scenario.
>
>
>
> What are you trying to accomplish?
>
>
>
> Thanks,
> Jason
>
>
>
> On Mon, Apr 22, 2019 at 2:09 PM Chetan Khatri 
> wrote:
>
> Hello All,
>
>
>
> If I am doing incremental load / delta and would like to update / delete
> the records in parquet, I understands that parquet is immutable and can't
> be deleted / updated theoretically only append / overwrite can be done. But
> I can see utility tools which claims to add value for that.
>
>
>
> https://github.com/Factual/parquet-rewriter
>
>
>
> Please throw a light.
>
>
>
> Thanks
>
>
>
>
> --
>
> Thanks,
>
> Jason
>
>


Re: Getting EOFFileException while reading from sequence file in spark

2019-05-03 Thread Prateek Rajput
Hi all,
Please share if anyone have faced the same problem. There are many similar
issues on web but I did not find any solution and reason why this happens.
It will be really helpful.
Regards,
Prateek

On Mon, Apr 29, 2019 at 3:18 PM Prateek Rajput 
wrote:

> I checked and removed 0 sized files then also it is coming. And sometimes
> when there is no 0 size file then also it is happening.
> I checked data also if it is corrupted by directly opening that file and
> checking it. I traced whole data but did not find any issue. For hadoop
> Map-Reduce no such issue is coming it is happening in case of spark only.
>
> On Mon, Apr 29, 2019 at 2:50 PM Deepak Sharma 
> wrote:
>
>> This can happen if the file size is 0
>>
>> On Mon, Apr 29, 2019 at 2:28 PM Prateek Rajput
>>  wrote:
>>
>>> Hi guys,
>>> I am getting this strange error again and again while reading from from
>>> a sequence file in spark.
>>> User class threw exception: org.apache.spark.SparkException: Job aborted.
>>> at
>>> org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>>> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
>>> at
>>> org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550)
>>> at
>>> org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
>>> at
>>> com.flipkart.prognos.spark.UniqueDroppedFSN.main(UniqueDroppedFSN.java:42)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>>> failure: Task 186 in stage 0.0 failed 4 times, most recent failure: Lost
>>> task 186.3 in stage 0.0 (TID 179, prod-fdphadoop-krios-dn-1039, executor
>>> 1): java.io.EOFException
>>> at java.io.DataInputStream.readFully(DataInputStream.java:197)
>>> at
>>> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:70)
>>> at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:120)
>>> at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2436)
>>> at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2568)
>>> 

Re: Spark 2.4.1 on Kubernetes - DNS resolution of driver fails

2019-05-03 Thread Olivier Girardot
Hi,
I did not try on another vendor, so I can't say if it's only related to
gke, and no, I did not notice anything on the kubelet or kube-dns
processes...

Regards

Le ven. 3 mai 2019 à 03:05, Li Gao  a écrit :

> hi Olivier,
>
> This seems a GKE specific issue? have you tried on other vendors ? Also on
> the kubelet nodes did you notice any pressure on the DNS side?
>
> Li
>
>
> On Mon, Apr 29, 2019, 5:43 AM Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> Hi everyone,
>> I have ~300 spark job on Kubernetes (GKE) using the cluster auto-scaler,
>> and sometimes while running these jobs a pretty bad thing happens, the
>> driver (in cluster mode) gets scheduled on Kubernetes and launches many
>> executor pods.
>> So far so good, but the k8s "Service" associated to the driver does not
>> seem to be propagated in terms of DNS resolution so all the executor fails
>> with a "spark-application-..cluster.svc.local" does not exists.
>>
>> All executors failing the driver should be failing too, but it considers
>> that it's a "pending" initial allocation and stay stuck forever in a loop
>> of "Initial job has not accepted any resources, please check Cluster UI"
>>
>> Has anyone else observed this king of behaviour ?
>> We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems to
>> exist even after the "big refactoring" in the kubernetes cluster scheduler
>> backend.
>>
>> I can work on a fix / workaround but I'd like to check with you the
>> proper way forward :
>>
>>- Some processes (like the airflow helm recipe) rely on a "sleep 30s"
>>before launching the dependent pods (that could be added to
>>/opt/entrypoint.sh used in the kubernetes packing)
>>- We can add a simple step to the init container trying to do the DNS
>>resolution and failing after 60s if it did not work
>>
>> But these steps won't change the fact that the driver will stay stuck
>> thinking we're still in the case of the Initial allocation delay.
>>
>> Thoughts ?
>>
>> --
>> *Olivier Girardot*
>> o.girar...@lateral-thoughts.com
>>
>