Challenges with Datasource V2 API
Hello Spark Experts, I am having challenges using the DataSource V2 API. I created a mock The input partitions seem to be created correctly. The below output confirms that: 19/06/23 16:00:21 INFO root: createInputPartitions 19/06/23 16:00:21 INFO root: Create a partition for abc The InputPartitionReader seems to have fetched the data right as well, however, it seems to keep going infinitely between the next() and get() operations of the InputPartitionReader while on the cluster. I tried to mock this and here is the code for the mockup - * https://github.com/skopp002/SparkDatasourceV2.git * However, the issue does not surface in the mock project. One concern that does seem to show up is the duplication of records that I had noticed once in production as well. There is only one record with usage value of "1.2006451E7" in mockdata.json. But there are multiple records in the load result. Could this be having the effect of infinite data in production? In production, even for a few KBs I hit the error below. ```2019-06-23 16:07:29 INFO UnsafeExternalSorter:209 - Thread 47 spilling sort data of 1984.0 MB to disk (50 times so far) 2019-06-23 16:07:31 INFO UnsafeExternalSorter:209 - Thread 47 spilling sort data of 1984.0 MB to disk (51 times so far) 2019-06-23 16:07:33 INFO UnsafeExternalSorter:209 - Thread 47 spilling sort data of 1984.0 MB to disk (52 times so far)``` But could not reproduce the exact error here in the mock project. Probably the data is too small to surface the problem. Can someone review the code and tell me if I am doing something wrong? regards Sunita
Problem with the ML ALS algorithm
I get an inexplicable exception when trying to build an ALSModel with the implicit set to true. I can’t find any help online. Thanks in advance. My code is: ALS als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("customer") .setItemCol("item") .setImplicitPrefs(true) .setRatingCol("rating"); ALSModel model = als.fit(training); The exception is: org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)? at org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65) ~[spark-mllib_2.11-2.3.1.jar:2.3.1] at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41) ~[spark-mllib_2.11-2.3.1.jar:2.3.1] at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747) ~[spark-mllib_2.11-2.3.1.jar:2.3.1]
Spark Structured Streaming Custom Sources confusion
Hi, I'm a bit confused about the current state and the future plans of custom data sources in Structured Streaming. So for DStreams we could write a Receiver as documented. Can this be used with Structured Streaming? Then we had the DataSource API with DefaultSource et. al. which was (in my opinion) never properly documented. With Spark 2.3 we got a new DataSourceV2 (which also was a marker interface), also not properly documented. Now with Spark 3 this seems to change again? ( https://issues.apache.org/jira/browse/SPARK-25390), at least the DataSourceV2 interface is gone, still no documentation but still called v2 somehow? Can anyone shed some light on the current state of data sources & sinks for batch & streaming in Spark 2.4 and 3.x? Thank you! Cheers, Lars
Distinguishing between field missing and null in individual record?
Suppose we have the following JSON, which we parse into a DataFrame (using the mulitline option). [{ "id": 8541, "value": "8541 changed again value" },{ "id": 51109, "name": "newest bob", "value": "51109 changed again" }] Regardless of whether we explicitly define a schema, or allow it to be inferred, the result of df.show(), after parsing this data, is similar to the following: +-+--++ | id| name| value| +-+--++ | 8541| null|8541 changed agai...| |51109|newest bob| 51109 changed again| +-+--++ Notice that the name column for the first row is null. This JSON will produce an identical DataFrame: [{ "id": 8541, "name": null, "value": "8541 changed again value" },{ "id": 51109, "name": "newest bob", "value": "51109 changed again" }] Is there a way to distinguish between these two cases in the DataFrame (i.e. field is missing, but added as null due to inferred or explicit schema, versus field is present but with null value)? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Implementing Upsert logic Through Streaming
Hi All, I will get records continously in text file form(Streaming). It will have timestamp as field also. Target is Oracle Database. My Goal is to maintain latest record for a key in Oracle. Could you please suggest how this can be implemented efficiently? Kind Regards, Sachit Murarka
Potential Problem : Dropping malformed tables from CSV (PySpark)
Hey, Currently working with CSV data and I've come across an unusual case. In Databricks, if I use the following code: GA_pages = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', comment = '#', mode = "DROPMALFORMED").load(ga_sessions_path) display(GA_pages) *OUTPUT:* +--+++--+-+---+-+-++--+--+ |User-Defined Value| Date|Sessions|% New Sessions|New Users|Bounce Rate|Pages/Session|Avg. Session Duration|Transactions| Revenue|E-commerce Conversion Rate| +--+++--+-+---+-+-++--+--+ | (not set)|20190508| 1,987| 60.09%| 1,194| 34.88%| 5.25| 00:03:34| 18| £2,205.22| 0.91%| | (not set)|20190528| 1,910| 62.20%| 1,188| 36.07%| 4.53| 00:02:47| 9| £1,045.56| 0.47%| | (not set)|20190509| 1,723| 62.45%| 1,076| 34.07%| 5.39| 00:03:49| 13| £2,264.27| 0.75%| | (not set)|20190530| 1,682| 64.98%| 1,093| 31.87%| 5.39| 00:03:39| 23| £3,292.06| 1.37%| | (not set)|20190527| 1,677| 63.33%| 1,062| 35.24%| 4.52| 00:03:02| 11| £2,261.22| 0.66%| | (not set)|20190511| 1,643| 62.08%| 1,020| 39.81%| 5.16| 00:03:06| 14| £2,412.03| 0.85%| | (not set)|20190529| 1,623| 61.24%| 994| 35.55%| 4.87| 00:03:05| 16| £3,213.60| 0.99%| | (not set)|20190513| 1,622| 68.13%| 1,105| 34.77%| 4.81| 00:03:07| 8| £1,660.61| 0.49%| | (not set)|20190505| 1,526| 59.90%| 914| 35.91%| 6.01| 00:03:48| 15| £3,022.73| 0.98%| | (not set)|20190504| 1,515| 61.12%| 926| 38.81%| 5.58| 00:03:26| 16| £2,238.47| 1.06%| | (not set)|20190514| 1,513| 59.35%| 898| 34.10%| 4.98| 00:03:12| 14| £3,055.11| 0.93%| | (not set)|20190526| 1,509| 69.05%| 1,042| 32.94%| 5.01| 00:03:17| 9| £2,346.37| 0.60%| | (not set)|20190501| 1,504| 53.46%| 804| 36.90%| 4.91| 00:03:13| 14| £2,606.60| 0.93%| | (not set)|20190506| 1,502| 63.18%| 949| 36.55%| 4.97| 00:03:19| 16| £2,419.02| 1.07%| | (not set)|20190517| 1,444| 58.86%| 850| 33.73%| 5.15| 00:03:13| 13| £3,650.09| 0.90%| | (not set)|20190521| 1,428| 58.40%| 834| 38.24%| 4.99| 00:03:37| 12| £1,718.84| 0.84%| | (not set)|20190502| 1,398| 58.44%| 817| 37.55%| 5.42| 00:03:40| 14| £2,835.89| 1.00%| | (not set)|20190503| 1,385| 56.03%| 776| 37.83%| 4.98| 00:03:06| 4| £763.65| 0.29%| | (not set)|20190507| 1,366| 59.66%| 815| 38.80%| 5.36| 00:03:30| 16| £2,094.68| 1.17%| | (not set)|20190510| 1,357| 63.23%| 858| 34.49%| 5.3| 00:03:31| 10| £1,432.15| 0.74%| | (not set)|20190601| 1,355| 56.75%| 769| 36.38%| 5.31| 00:03:18| 20| £5,260.78| 1.48%| | (not set)|20190531| 1,332| 60.66%| 808| 36.86%| 5.09| 00:03:26| 13| £2,006.91| 0.98%| | (not set)|20190602| 1,332| 60.96%| 812| 37.16%| 5.08| 00:03:29| 17| £3,419.28| 1.28%| | (not set)|20190603| 1,329| 57.49%| 764| 36.19%| 5.11| 00:03:22| 11| £2,882.18| 0.83%| | (not set)|20190515| 1,299| 59.12%| 768| 35.95%| 5.58| 00:03:57| 3| £118.00| 0.23%| | (not set)|20190604| 1,291| 60.42%| 780| 36.48%| 4.78| 00:03:02| 9| £1,826.35| 0.70%| | (not set)|20190520| 1,274| 61.54%| 784| 35.32%| 5.44| 00:03:33| 14| £2,845.98| 1.10%| | (not set)|20190522| 1,256| 63.54%| 798| 37.18%| 4.71| 00:03:06| 13| £1,551.32| 1.04%| | (not set)|20190605| 1,254| 58.21%| 730| 36.04%| 4.85| 00:03:14| 10| £1,676.51| 0.80%| | (not set)|20190512| 1,244| 64.63%| 804| 36.82%| 5.44| 00:03:15| 12| £1,823.22| 0.96%| | (not set)|20190518| 1,201| 62.03%| 745| 36.47%| 5.46| 00:03:32| 12| £1,765.69| 1.00%| | (not set)|20190516| 1,177| 60.92%| 717| 33.98%| 5.39| 00:03:51| 16| £2,851.27| 1.36%| | (not set)|20190524| 1,137| 61.39%| 698| 40.55%| 4.76| 00:02:52| 10| £1,562.80| 0.88%| | (not set)|20190525| 1,110| 63.87%| 709| 34.14%| 5.83| 00:03:46| 15| £2,520.85| 1.35%| | (not set)|20190523| 1,105| 61.27%| 677| 37.56%| 4.66| 00:03:22| 10| £2,472.89| 0.90%| | (not set)|20190519| 1,081| 64.20%| 694| 32.01%| 5.93| 00:03:35| 10| £2,341.90| 0.93%| | null| null| 51,091| 61.21%| 31,272| 35.99%| 5.15| 00:03:22| 460|£83,464.11| 0.90%| +--+++--+-+---+-+-++--+--+ The code does as expected and the malformed data is dropped However if I carry out filters on the data before displaying, the malformed data reappears GA_pages = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', comment = '#', mode = "DROPMALFORMED").load(ga_sessions_path) GA_pages = GA_pages.withColumn('date', fn.col('Date'))\ .withColumn('sessions', fn.regexp_replace("Sessions", r',', ""))\ .withColumn('pages/session', fn.col('Pages/Session'))\ .withColumn('pages', (fn.col('pages/session')*fn.col('sessions')))\ .select('date', 'sessions', 'pages', 'pages/session')\ display(GA_pages) *OUTPUT:* ++-+--+-+ | date| sessions| pages|pages/session|