Challenges with Datasource V2 API

2019-06-25 Thread Sunita Arvind
Hello Spark Experts,

I am having challenges using the DataSource V2 API. I created a mock

The input partitions seem to be created correctly. The below output
confirms that:

19/06/23 16:00:21 INFO root: createInputPartitions
19/06/23 16:00:21 INFO root: Create a partition for abc

The InputPartitionReader seems to have fetched the data right as well,
however, it seems to keep going infinitely between the next() and get()
operations of the InputPartitionReader while on the cluster.

I tried to mock this and here is the code for the mockup - *
https://github.com/skopp002/SparkDatasourceV2.git *

However, the issue does not surface in the mock project. One concern that
does seem to show up is the duplication of records that I had noticed once
in production as well. There is only one record with usage value of
"1.2006451E7" in mockdata.json. But there are multiple records in the load
result. Could this be having the effect of infinite data in production? In
production, even for a few KBs I hit the error below.
```2019-06-23 16:07:29 INFO UnsafeExternalSorter:209 - Thread 47 spilling
sort data of 1984.0 MB to disk (50 times so far)
2019-06-23 16:07:31 INFO UnsafeExternalSorter:209 - Thread 47 spilling sort
data of 1984.0 MB to disk (51 times so far)
2019-06-23 16:07:33 INFO UnsafeExternalSorter:209 - Thread 47 spilling sort
data of 1984.0 MB to disk (52 times so far)```

But could not reproduce the exact error here in the mock project. Probably
the data is too small to surface the problem.
Can someone review the code and tell me if I am doing something wrong?

regards
Sunita


Problem with the ML ALS algorithm

2019-06-25 Thread Steve Pruitt
I get an inexplicable exception when trying to build an ALSModel with the 
implicit set to true.  I can’t find any help online.

Thanks in advance.

My code is:

ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("customer")
.setItemCol("item")
.setImplicitPrefs(true)
.setRatingCol("rating");
ALSModel model = als.fit(training);

The exception is:
org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 
because A is not positive definite. Is A derived from a singular matrix (e.g. 
collinear column values)?
at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
 ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
 ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
at 
org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747) 
~[spark-mllib_2.11-2.3.1.jar:2.3.1]


Spark Structured Streaming Custom Sources confusion

2019-06-25 Thread Lars Francke
Hi,

I'm a bit confused about the current state and the future plans of custom
data sources in Structured Streaming.

So for DStreams we could write a Receiver as documented. Can this be used
with Structured Streaming?

Then we had the DataSource API with DefaultSource et. al. which was (in my
opinion) never properly documented.

With Spark 2.3 we got a new DataSourceV2 (which also was a marker
interface), also not properly documented.

Now with Spark 3 this seems to change again? (
https://issues.apache.org/jira/browse/SPARK-25390), at least the
DataSourceV2 interface is gone, still no documentation but still called v2
somehow?

Can anyone shed some light on the current state of data sources & sinks for
batch & streaming in Spark 2.4 and 3.x?

Thank you!

Cheers,
Lars


Distinguishing between field missing and null in individual record?

2019-06-25 Thread Jeff Evans
Suppose we have the following JSON, which we parse into a DataFrame
(using the mulitline option).

[{
  "id": 8541,
  "value": "8541 changed again value"
},{
  "id": 51109,
  "name": "newest bob",
  "value": "51109 changed again"
}]

Regardless of whether we explicitly define a schema, or allow it to be
inferred, the result of df.show(), after parsing this data, is similar
to the following:

+-+--++
|   id|  name|   value|
+-+--++
| 8541|  null|8541 changed agai...|
|51109|newest bob| 51109 changed again|
+-+--++

Notice that the name column for the first row is null.  This JSON will
produce an identical DataFrame:

[{
  "id": 8541,
  "name": null,
  "value": "8541 changed again value"
},{
  "id": 51109,
  "name": "newest bob",
  "value": "51109 changed again"
}]

Is there a way to distinguish between these two cases in the DataFrame
(i.e. field is missing, but added as null due to inferred or explicit
schema, versus field is present but with null value)?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Implementing Upsert logic Through Streaming

2019-06-25 Thread Sachit Murarka
Hi All,

I will get records continously in text file form(Streaming). It will have
timestamp as field also.

Target is Oracle Database.

My Goal is to maintain latest record for a key in Oracle. Could you please
suggest how this can be implemented efficiently?

Kind Regards,
Sachit Murarka


Potential Problem : Dropping malformed tables from CSV (PySpark)

2019-06-25 Thread Conor Begley
Hey,

Currently working with CSV data and I've come across an unusual case. In
Databricks, if I use the following code:

GA_pages =
sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true', comment = '#', mode =
"DROPMALFORMED").load(ga_sessions_path)

display(GA_pages)

*OUTPUT:*
+--+++--+-+---+-+-++--+--+
|User-Defined Value| Date|Sessions|% New Sessions|New Users|Bounce
Rate|Pages/Session|Avg. Session Duration|Transactions| Revenue|E-commerce
Conversion Rate|
+--+++--+-+---+-+-++--+--+
| (not set)|20190508| 1,987| 60.09%| 1,194| 34.88%| 5.25| 00:03:34| 18|
£2,205.22| 0.91%| | (not set)|20190528| 1,910| 62.20%| 1,188| 36.07%| 4.53|
00:02:47| 9| £1,045.56| 0.47%| | (not set)|20190509| 1,723| 62.45%| 1,076|
34.07%| 5.39| 00:03:49| 13| £2,264.27| 0.75%| | (not set)|20190530| 1,682|
64.98%| 1,093| 31.87%| 5.39| 00:03:39| 23| £3,292.06| 1.37%| | (not
set)|20190527| 1,677| 63.33%| 1,062| 35.24%| 4.52| 00:03:02| 11| £2,261.22|
0.66%| | (not set)|20190511| 1,643| 62.08%| 1,020| 39.81%| 5.16| 00:03:06|
14| £2,412.03| 0.85%| | (not set)|20190529| 1,623| 61.24%| 994| 35.55%|
4.87| 00:03:05| 16| £3,213.60| 0.99%| | (not set)|20190513| 1,622| 68.13%|
1,105| 34.77%| 4.81| 00:03:07| 8| £1,660.61| 0.49%| | (not set)|20190505|
1,526| 59.90%| 914| 35.91%| 6.01| 00:03:48| 15| £3,022.73| 0.98%| | (not
set)|20190504| 1,515| 61.12%| 926| 38.81%| 5.58| 00:03:26| 16| £2,238.47|
1.06%| | (not set)|20190514| 1,513| 59.35%| 898| 34.10%| 4.98| 00:03:12|
14| £3,055.11| 0.93%| | (not set)|20190526| 1,509| 69.05%| 1,042| 32.94%|
5.01| 00:03:17| 9| £2,346.37| 0.60%| | (not set)|20190501| 1,504| 53.46%|
804| 36.90%| 4.91| 00:03:13| 14| £2,606.60| 0.93%| | (not set)|20190506|
1,502| 63.18%| 949| 36.55%| 4.97| 00:03:19| 16| £2,419.02| 1.07%| | (not
set)|20190517| 1,444| 58.86%| 850| 33.73%| 5.15| 00:03:13| 13| £3,650.09|
0.90%| | (not set)|20190521| 1,428| 58.40%| 834| 38.24%| 4.99| 00:03:37|
12| £1,718.84| 0.84%| | (not set)|20190502| 1,398| 58.44%| 817| 37.55%|
5.42| 00:03:40| 14| £2,835.89| 1.00%| | (not set)|20190503| 1,385| 56.03%|
776| 37.83%| 4.98| 00:03:06| 4| £763.65| 0.29%| | (not set)|20190507|
1,366| 59.66%| 815| 38.80%| 5.36| 00:03:30| 16| £2,094.68| 1.17%| | (not
set)|20190510| 1,357| 63.23%| 858| 34.49%| 5.3| 00:03:31| 10| £1,432.15|
0.74%| | (not set)|20190601| 1,355| 56.75%| 769| 36.38%| 5.31| 00:03:18|
20| £5,260.78| 1.48%| | (not set)|20190531| 1,332| 60.66%| 808| 36.86%|
5.09| 00:03:26| 13| £2,006.91| 0.98%| | (not set)|20190602| 1,332| 60.96%|
812| 37.16%| 5.08| 00:03:29| 17| £3,419.28| 1.28%| | (not set)|20190603|
1,329| 57.49%| 764| 36.19%| 5.11| 00:03:22| 11| £2,882.18| 0.83%| | (not
set)|20190515| 1,299| 59.12%| 768| 35.95%| 5.58| 00:03:57| 3| £118.00|
0.23%| | (not set)|20190604| 1,291| 60.42%| 780| 36.48%| 4.78| 00:03:02| 9|
£1,826.35| 0.70%| | (not set)|20190520| 1,274| 61.54%| 784| 35.32%| 5.44|
00:03:33| 14| £2,845.98| 1.10%| | (not set)|20190522| 1,256| 63.54%| 798|
37.18%| 4.71| 00:03:06| 13| £1,551.32| 1.04%| | (not set)|20190605| 1,254|
58.21%| 730| 36.04%| 4.85| 00:03:14| 10| £1,676.51| 0.80%| | (not
set)|20190512| 1,244| 64.63%| 804| 36.82%| 5.44| 00:03:15| 12| £1,823.22|
0.96%| | (not set)|20190518| 1,201| 62.03%| 745| 36.47%| 5.46| 00:03:32|
12| £1,765.69| 1.00%| | (not set)|20190516| 1,177| 60.92%| 717| 33.98%|
5.39| 00:03:51| 16| £2,851.27| 1.36%| | (not set)|20190524| 1,137| 61.39%|
698| 40.55%| 4.76| 00:02:52| 10| £1,562.80| 0.88%| | (not set)|20190525|
1,110| 63.87%| 709| 34.14%| 5.83| 00:03:46| 15| £2,520.85| 1.35%| | (not
set)|20190523| 1,105| 61.27%| 677| 37.56%| 4.66| 00:03:22| 10| £2,472.89|
0.90%| | (not set)|20190519| 1,081| 64.20%| 694| 32.01%| 5.93| 00:03:35|
10| £2,341.90| 0.93%| | null| null| 51,091| 61.21%| 31,272| 35.99%| 5.15|
00:03:22| 460|£83,464.11| 0.90%|
+--+++--+-+---+-+-++--+--+


The code does as expected and the malformed data is dropped

However if I carry out filters on the data before displaying, the malformed
data reappears

GA_pages =
sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true', comment = '#', mode =
"DROPMALFORMED").load(ga_sessions_path)

GA_pages = GA_pages.withColumn('date', fn.col('Date'))\
  .withColumn('sessions', fn.regexp_replace("Sessions", r',', ""))\
  .withColumn('pages/session', fn.col('Pages/Session'))\
  .withColumn('pages', (fn.col('pages/session')*fn.col('sessions')))\
  .select('date', 'sessions', 'pages', 'pages/session')\

display(GA_pages)

*OUTPUT:*
++-+--+-+ | date| sessions|
pages|pages/session|