WrappedArray to row of relational Db
I have nested structure which i read from an xml using spark-Xml. I want to use spark sql to convert this nested structure to different relational tables (WrappedArray([WrappedArray([[null,592006340,null],null,BA,M,1724]),N,2017-04-05T16:31:03,586257528),659925562) which has a schema: StructType(StructField(AirSegment,ArrayType(StructType( StructField(CodeshareDetails,ArrayType(StructType(StructField(Links,StructType(StructField(_VALUE,StringType,true), StructField(_mktSegmentID,LongType,true), StructField(_oprSegmentID,LongType,true)),true), StructField(_alphaSuffix,StringType,true), StructField(_carrierCode,StringType,true), StructField(_codeshareType,StringType,true), StructField(_flightNumber,StringType,true)),true),true), StructField(_adsIsDeleted,StringType,true), StructField(_adsLastUpdateTimestamp,StringType,true), StructField(_AirID,LongType,true)),true),true), StructField(flightId,LongType,true)) *Question: Now as you can see this codeshareDetails is a wrappedArray inside a Wrapped array. How can I extract these wrapped array rows along with the _AirID column so that I can insert these rows in the codeshare table (sqliteDb) (having column related to codeshare only along with _AirID as foreign key, used for joining back).* *PS:I tried exploding but in case if there are multiple rows in the AirSegment array it doesnt work properly* My table Structure is mentioned below: Flight-contatining flightId and other Details: AirSegment: Containing _AirID(PK), flightID(FK), and AirSegmentDetails CodeshareDetails: containing CodeshareDetails as well as _AirID(FK) Let me know if you need any more information -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/WrappedArray-to-row-of-relational-Db-tp28625.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[ann] Release of TensorFrames 0.2.8
Hello all, I would like to bring to your attention the (long overdue) release of a new version of TensorFrames. Thank you to all people who have reported some packaging and installation issues. This release fixes a large number of performance and stability problems, and brings a few improvements. As an example, following this notebook [1], you can distribute the classification of images using Spark, TensorFlow and the Inception V3 model from google. It is published as a Databricks notebook and it has been tested on Jupyter as well. What is TensorFrames? TensorFrames (TensorFlow on Spark Dataframes) lets you manipulate Spark's DataFrames with TensorFlow programs. Spark package: https://spark-packages.org/package/databricks/tensorframes Release notes: https://github.com/databricks/tensorframes/releases/tag/v0.2.8 Best regards Tim Hunter [1] https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5181772898130619/2927463166045304/1282150081618649/latest.html
weird error message
I’m having issues when I fire up pyspark on a fresh install. When I submit the same process via spark-submit it works. Here’s a dump of the trace: at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.sql.SQLException: Failed to create database 'metastore_db', see the next exception for details. at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection.createDatabase(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection.(Unknown Source) at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source) at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.jdbc.InternalDriver.getNewEmbedConnection(Unknown Source) at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source) at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source) at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source) at java.sql.DriverManager.getConnection(DriverManager.java:664) at java.sql.DriverManager.getConnection(DriverManager.java:208) at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:361) at com.jolbox.bonecp.BoneCP.(BoneCP.java:416) ... 92 more Caused by: ERROR XJ041: Failed to create database 'metastore_db', see the next exception for details. at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown Source) ... 108 more Caused by: ERROR XBM0H: Directory /home/ubuntu/spark-2.1.0-bin-hadoop2.7/bin/metastore_db cannot be created. at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.impl.services.monitor.StorageFactoryService$10.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.services.monitor.StorageFactoryService.createServiceRoot(Unknown Source) at org.apache.derby.impl.services.monitor.BaseMonitor.bootService(Unknown Source) at org.apache.derby.impl.services.monitor.BaseMonitor.createPersistentService(Unknown Source) at org.apache.derby.impl.services.monitor.FileMonitor.createPersistentService(Unknown Source) at org.apache.derby.iapi.services.monitor.Monitor.createPersistentService(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection$5.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.jdbc.EmbedConnection.createPersistentService(Unknown Source) ... 105 more Traceback (most recent call last): File "/home/ubuntu/spark-2.1.0-bin-hadoop2.7/python/pyspark/shell.py", line 43, in spark = SparkSession.builder\ File "/home/ubuntu/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 179, in getOrCreate session._jsparkSession.sessionState().conf().setConfString(key, value) File "/home/ubuntu/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/home/ubuntu/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':" The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may
Re: Spark Testing Library Discussion
Hi all, whoever (Sam I think) was going to do some work on doing a template testing pipeline. I'd love to be involved, I have a current task in my day job (data engineer) to flesh out our testing how-to / best practices for Spark jobs and I think I'll be doing something very similar for the next week or 2. I'll scrape out what i have now in the next day or so and put it up in a gist that I can share too. G On 25 April 2017 at 13:04, Holden Karau wrote: > Urgh hangouts did something frustrating, updated link > https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe > > On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau > wrote: > >> The (tentative) link for those interested is https://hangouts.google.com >> /hangouts/_/oyjvcnffejcjhi6qazf3lysypue . >> >> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau >> wrote: >> >>> So 14 people have said they are available on Tuesday the 25th at 1PM >>> pacific so we will do this meeting then ( https://doodle.com/poll/69y6 >>> yab4pyf7u8bn ). >>> >>> Since hangouts tends to work ok on the Linux distro I'm running my >>> default is to host this as a "hangouts-on-air" unless there are alternative >>> ideas. >>> >>> I'll record the hangout and if it isn't terrible I'll post it for those >>> who weren't able to make it (and for next time I'll include more European >>> friendly time options - Doodle wouldn't let me update it once posted). >>> >>> On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau >>> wrote: >>> Hi Spark Users (+ Some Spark Testing Devs on BCC), Awhile back on one of the many threads about testing in Spark there was some interest in having a chat about the state of Spark testing and what people want/need. So if you are interested in joining an online (with maybe an IRL component if enough people are SF based) chat about Spark testing please fill out this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn I think reasonable topics of discussion could be: 1) What is the state of the different Spark testing libraries in the different core (Scala, Python, R, Java) and extended languages (C#, Javascript, etc.)? 2) How do we make these more easily discovered by users? 3) What are people looking for in their testing libraries that we are missing? (can be functionality, documentation, etc.) 4) Are there any examples of well tested open source Spark projects and where are they? If you have other topics that's awesome. To clarify this about libraries and best practices for people testing their Spark applications, and less about testing Spark's internals (although as illustrated by some of the libraries there is some strong overlap in what is required to make that work). Cheers, Holden :) -- Cell : 425-233-8271 <(425)%20233-8271> Twitter: https://twitter.com/holdenkarau >>> >>> >>> >>> -- >>> Cell : 425-233-8271 <(425)%20233-8271> >>> Twitter: https://twitter.com/holdenkarau >>> >> >> >> >> -- >> Cell : 425-233-8271 <(425)%20233-8271> >> Twitter: https://twitter.com/holdenkarau >> > > > > -- > Cell : 425-233-8271 <(425)%20233-8271> > Twitter: https://twitter.com/holdenkarau >
Re: Spark Testing Library Discussion
Urgh hangouts did something frustrating, updated link https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau wrote: > The (tentative) link for those interested is https://hangouts.google. > com/hangouts/_/oyjvcnffejcjhi6qazf3lysypue . > > On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau > wrote: > >> So 14 people have said they are available on Tuesday the 25th at 1PM >> pacific so we will do this meeting then ( https://doodle.com/poll/69y6 >> yab4pyf7u8bn ). >> >> Since hangouts tends to work ok on the Linux distro I'm running my >> default is to host this as a "hangouts-on-air" unless there are alternative >> ideas. >> >> I'll record the hangout and if it isn't terrible I'll post it for those >> who weren't able to make it (and for next time I'll include more European >> friendly time options - Doodle wouldn't let me update it once posted). >> >> On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau >> wrote: >> >>> Hi Spark Users (+ Some Spark Testing Devs on BCC), >>> >>> Awhile back on one of the many threads about testing in Spark there was >>> some interest in having a chat about the state of Spark testing and what >>> people want/need. >>> >>> So if you are interested in joining an online (with maybe an IRL >>> component if enough people are SF based) chat about Spark testing please >>> fill out this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn >>> >>> I think reasonable topics of discussion could be: >>> >>> 1) What is the state of the different Spark testing libraries in the >>> different core (Scala, Python, R, Java) and extended languages (C#, >>> Javascript, etc.)? >>> 2) How do we make these more easily discovered by users? >>> 3) What are people looking for in their testing libraries that we are >>> missing? (can be functionality, documentation, etc.) >>> 4) Are there any examples of well tested open source Spark projects and >>> where are they? >>> >>> If you have other topics that's awesome. >>> >>> To clarify this about libraries and best practices for people testing >>> their Spark applications, and less about testing Spark's internals >>> (although as illustrated by some of the libraries there is some strong >>> overlap in what is required to make that work). >>> >>> Cheers, >>> >>> Holden :) >>> >>> -- >>> Cell : 425-233-8271 <(425)%20233-8271> >>> Twitter: https://twitter.com/holdenkarau >>> >> >> >> >> -- >> Cell : 425-233-8271 <(425)%20233-8271> >> Twitter: https://twitter.com/holdenkarau >> > > > > -- > Cell : 425-233-8271 <(425)%20233-8271> > Twitter: https://twitter.com/holdenkarau > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau
Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll
Hi all, Because the Spark Streaming direct Kafka consumer maps offsets for a given Kafka topic and a partition internally while having enable.auto.commit set to false, how can I retrieve the offset of each made consumer’s poll call using the offset ranges of an RDD? More precisely, the information I seek to get after each poll call is the following: . Thanks in advance, Dominik
Re: how to find the nearest holiday
Still not working. Seems like there's some syntax error. from pyspark.sql.functions import udf start_date_test2.withColumn("diff", datediff(start_date_test2.start_date, start_date_test2.holiday.getItem[0])).show() ---TypeError Traceback (most recent call last) in () 26 27 from pyspark.sql.functions import udf---> 28 start_date_test2.withColumn("diff", datediff(start_date_test2.start_date, start_date_test2.holiday.getItem[0])).show() TypeError: 'method' object is not subscriptable On Tue, Apr 25, 2017 at 10:59 PM, Pushkar.Gujar wrote: > > You can use > - > start_date_test2.holiday.getItem[0] > > I would highly suggest you to look at latest documentation - > http://spark.apache.org/docs/latest/api/python/index.html > > > Thank you, > *Pushkar Gujar* > > > On Tue, Apr 25, 2017 at 8:50 AM, Zeming Yu wrote: > >> How could I access the first element of the holiday column? >> >> I tried the following code, but it doesn't work: >> start_date_test2.withColumn("diff", datediff(start_date_test2.start_date, >> >> >> start_date_test2.holiday*[0]*)).show() >> >> On Tue, Apr 25, 2017 at 10:20 PM, Zeming Yu wrote: >> >>> Got it working now! >>> >>> Does anyone have a pyspark example of how to calculate the numbers of >>> days from the nearest holiday based on an array column? >>> >>> I.e. from this table >>> >>> +--+---+ >>> |start_date|holiday| >>> +--+---+ >>> |2017-08-11|[2017-05-30,2017-10-01]| >>> >>> >>> calculate a column called "days_from_nearest_holiday" which calculates the >>> difference between 11 aug 2017 and 1 oct 2017? >>> >>> >>> >>> >>> >>> On Tue, Apr 25, 2017 at 6:00 PM, Wen Pei Yu wrote: >>> TypeError: unorderable types: str() >= datetime.date() Should transfer string to Date type when compare. Yu Wenpei. - Original message - From: Zeming Yu To: user Cc: Subject: how to find the nearest holiday Date: Tue, Apr 25, 2017 3:39 PM I have a column of dates (date type), just trying to find the nearest holiday of the date. Anyone has any idea what went wrong below? start_date_test = flight3.select("start_date").distinct() start_date_test.show() holidays = ['2017-09-01', '2017-10-01'] +--+ |start_date| +--+ |2017-08-11| |2017-09-11| |2017-09-28| |2017-06-29| |2017-09-29| |2017-07-31| |2017-08-14| |2017-08-18| |2017-04-09| |2017-09-21| |2017-08-10| |2017-06-30| |2017-08-19| |2017-07-06| |2017-06-28| |2017-09-14| |2017-08-08| |2017-08-22| |2017-07-03| |2017-07-30| +--+ only showing top 20 rows index = spark.sparkContext.broadcast(sorted(holidays)) def nearest_holiday(date): last_holiday = index.value[0] for next_holiday in index.value: if next_holiday >= date: break last_holiday = next_holiday if last_holiday > date: last_holiday = None if next_holiday < date: next_holiday = None return (last_holiday, next_holiday) from pyspark.sql.types import * return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())]) from pyspark.sql.functions import udf nearest_holiday_udf = udf(nearest_holiday, return_type) start_date_test.withColumn('holiday', nearest_holiday_udf('start_date')).show(5, False) here's the error I got: --- Py4JJavaError Traceback (most recent call last) in () 24 nearest_holiday_udf = udf(nearest_holiday, return_type) 25 ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf( 'start_date')).show(5, False) C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho n\pyspark\sql\dataframe.py in show(self, n, truncate) 318 print(self._jdf.showString(n, 20)) 319 else: --> 320 print(self._jdf.showString(n, int(truncate))) 321 322 def __repr__(self): C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho n\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.
spark streaming resiliency
Hi, I have a question regarding Spark streaming resiliency and the documentation is ambiguous : The documentation says that the default configuration use a replication factor of 2 for data received but the recommendation is to use write ahead logs to guarantee data resiliency with receivers. "Additionally, it is recommended that the replication of the received data within Spark be disabled when the write ahead log is enabled as the log is already stored in a replicated storage system." The doc says it useless to duplicate with WAL, but what is the benefit of using WAL instead of the internal in memory replication ? I would assume it's better to replicate in memory than write on a replicated FS reagarding performance... Can a streaming expert explain me ? BR
Re: how to find the nearest holiday
You can use - start_date_test2.holiday.getItem[0] I would highly suggest you to look at latest documentation - http://spark.apache.org/docs/latest/api/python/index.html Thank you, *Pushkar Gujar* On Tue, Apr 25, 2017 at 8:50 AM, Zeming Yu wrote: > How could I access the first element of the holiday column? > > I tried the following code, but it doesn't work: > start_date_test2.withColumn("diff", datediff(start_date_test2.start_date, > > start_date_test2.holiday*[0]*)).show() > > On Tue, Apr 25, 2017 at 10:20 PM, Zeming Yu wrote: > >> Got it working now! >> >> Does anyone have a pyspark example of how to calculate the numbers of >> days from the nearest holiday based on an array column? >> >> I.e. from this table >> >> +--+---+ >> |start_date|holiday| >> +--+---+ >> |2017-08-11|[2017-05-30,2017-10-01]| >> >> >> calculate a column called "days_from_nearest_holiday" which calculates the >> difference between 11 aug 2017 and 1 oct 2017? >> >> >> >> >> >> On Tue, Apr 25, 2017 at 6:00 PM, Wen Pei Yu wrote: >> >>> TypeError: unorderable types: str() >= datetime.date() >>> >>> Should transfer string to Date type when compare. >>> >>> Yu Wenpei. >>> >>> >>> - Original message - >>> From: Zeming Yu >>> To: user >>> Cc: >>> Subject: how to find the nearest holiday >>> Date: Tue, Apr 25, 2017 3:39 PM >>> >>> I have a column of dates (date type), just trying to find the nearest >>> holiday of the date. Anyone has any idea what went wrong below? >>> >>> >>> >>> start_date_test = flight3.select("start_date").distinct() >>> start_date_test.show() >>> >>> holidays = ['2017-09-01', '2017-10-01'] >>> >>> +--+ >>> |start_date| >>> +--+ >>> |2017-08-11| >>> |2017-09-11| >>> |2017-09-28| >>> |2017-06-29| >>> |2017-09-29| >>> |2017-07-31| >>> |2017-08-14| >>> |2017-08-18| >>> |2017-04-09| >>> |2017-09-21| >>> |2017-08-10| >>> |2017-06-30| >>> |2017-08-19| >>> |2017-07-06| >>> |2017-06-28| >>> |2017-09-14| >>> |2017-08-08| >>> |2017-08-22| >>> |2017-07-03| >>> |2017-07-30| >>> +--+ >>> only showing top 20 rows >>> >>> >>> >>> index = spark.sparkContext.broadcast(sorted(holidays)) >>> >>> def nearest_holiday(date): >>> last_holiday = index.value[0] >>> for next_holiday in index.value: >>> if next_holiday >= date: >>> break >>> last_holiday = next_holiday >>> if last_holiday > date: >>> last_holiday = None >>> if next_holiday < date: >>> next_holiday = None >>> return (last_holiday, next_holiday) >>> >>> >>> from pyspark.sql.types import * >>> return_type = StructType([StructField('last_holiday', StringType()), >>> StructField('next_holiday', StringType())]) >>> >>> from pyspark.sql.functions import udf >>> nearest_holiday_udf = udf(nearest_holiday, return_type) >>> >>> start_date_test.withColumn('holiday', >>> nearest_holiday_udf('start_date')).show(5, >>> False) >>> >>> >>> here's the error I got: >>> >>> >>> --- >>> Py4JJavaError Traceback (most recent call >>> last) >>> in () >>> 24 nearest_holiday_udf = udf(nearest_holiday, return_type) >>> 25 >>> ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf( >>> 'start_date')).show(5, False) >>> >>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho >>> n\pyspark\sql\dataframe.py in show(self, n, truncate) >>> 318 print(self._jdf.showString(n, 20)) >>> 319 else: >>> --> 320 print(self._jdf.showString(n, int(truncate))) >>> 321 >>> 322 def __repr__(self): >>> >>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho >>> n\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, *args) >>>1131 answer = self.gateway_client.send_command(command) >>>1132 return_value = get_return_value( >>> -> 1133 answer, self.gateway_client, self.target_id, >>> self.name) >>>1134 >>>1135 for temp_arg in temp_args: >>> >>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho >>> n\pyspark\sql\utils.py in deco(*a, **kw) >>> 61 def deco(*a, **kw): >>> 62 try: >>> ---> 63 return f(*a, **kw) >>> 64 except py4j.protocol.Py4JJavaError as e: >>> 65 s = e.java_exception.toString() >>> >>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho >>> n\lib\py4j-0.10.4-src.zip\py4j\protocol.py in get_return_value(answer, >>> gateway_client, target_id, name) >>> 317 raise Py4JJavaError( >>> 318 "An error occurred while calling >>> {0}{1}{2}.\n". >>> --> 319 format(target_id, ".", name), value) >>> 320 else: >>> 321 raise Py4JError( >>> >>> Py4JJavaError: An error occurred while calling o566.showString. >>> : org
Re: Authorizations in thriftserver
Does someone have the answer ? 2017-04-24 9:32 GMT+02:00 vincent gromakowski : > Hi, > Can someone confirm authorizations aren't implemented in Spark > thriftserver for SQL standard based hive authorizations? > https://cwiki.apache.org/confluence/display/Hive/SQL+Standard+Based+Hive+ > Authorization > If confirmed, any plan to implement it ? > Thanks > >
Re: how to find the nearest holiday
How could I access the first element of the holiday column? I tried the following code, but it doesn't work: start_date_test2.withColumn("diff", datediff(start_date_test2.start_date, start_date_test2.holiday*[0]*)).show() On Tue, Apr 25, 2017 at 10:20 PM, Zeming Yu wrote: > Got it working now! > > Does anyone have a pyspark example of how to calculate the numbers of days > from the nearest holiday based on an array column? > > I.e. from this table > > +--+---+ > |start_date|holiday| > +--+---+ > |2017-08-11|[2017-05-30,2017-10-01]| > > > calculate a column called "days_from_nearest_holiday" which calculates the > difference between 11 aug 2017 and 1 oct 2017? > > > > > > On Tue, Apr 25, 2017 at 6:00 PM, Wen Pei Yu wrote: > >> TypeError: unorderable types: str() >= datetime.date() >> >> Should transfer string to Date type when compare. >> >> Yu Wenpei. >> >> >> - Original message - >> From: Zeming Yu >> To: user >> Cc: >> Subject: how to find the nearest holiday >> Date: Tue, Apr 25, 2017 3:39 PM >> >> I have a column of dates (date type), just trying to find the nearest >> holiday of the date. Anyone has any idea what went wrong below? >> >> >> >> start_date_test = flight3.select("start_date").distinct() >> start_date_test.show() >> >> holidays = ['2017-09-01', '2017-10-01'] >> >> +--+ >> |start_date| >> +--+ >> |2017-08-11| >> |2017-09-11| >> |2017-09-28| >> |2017-06-29| >> |2017-09-29| >> |2017-07-31| >> |2017-08-14| >> |2017-08-18| >> |2017-04-09| >> |2017-09-21| >> |2017-08-10| >> |2017-06-30| >> |2017-08-19| >> |2017-07-06| >> |2017-06-28| >> |2017-09-14| >> |2017-08-08| >> |2017-08-22| >> |2017-07-03| >> |2017-07-30| >> +--+ >> only showing top 20 rows >> >> >> >> index = spark.sparkContext.broadcast(sorted(holidays)) >> >> def nearest_holiday(date): >> last_holiday = index.value[0] >> for next_holiday in index.value: >> if next_holiday >= date: >> break >> last_holiday = next_holiday >> if last_holiday > date: >> last_holiday = None >> if next_holiday < date: >> next_holiday = None >> return (last_holiday, next_holiday) >> >> >> from pyspark.sql.types import * >> return_type = StructType([StructField('last_holiday', StringType()), >> StructField('next_holiday', StringType())]) >> >> from pyspark.sql.functions import udf >> nearest_holiday_udf = udf(nearest_holiday, return_type) >> >> start_date_test.withColumn('holiday', >> nearest_holiday_udf('start_date')).show(5, >> False) >> >> >> here's the error I got: >> >> >> --- >> Py4JJavaError Traceback (most recent call >> last) >> in () >> 24 nearest_holiday_udf = udf(nearest_holiday, return_type) >> 25 >> ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf( >> 'start_date')).show(5, False) >> >> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho >> n\pyspark\sql\dataframe.py in show(self, n, truncate) >> 318 print(self._jdf.showString(n, 20)) >> 319 else: >> --> 320 print(self._jdf.showString(n, int(truncate))) >> 321 >> 322 def __repr__(self): >> >> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho >> n\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, *args) >>1131 answer = self.gateway_client.send_command(command) >>1132 return_value = get_return_value( >> -> 1133 answer, self.gateway_client, self.target_id, >> self.name) >>1134 >>1135 for temp_arg in temp_args: >> >> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho >> n\pyspark\sql\utils.py in deco(*a, **kw) >> 61 def deco(*a, **kw): >> 62 try: >> ---> 63 return f(*a, **kw) >> 64 except py4j.protocol.Py4JJavaError as e: >> 65 s = e.java_exception.toString() >> >> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho >> n\lib\py4j-0.10.4-src.zip\py4j\protocol.py in get_return_value(answer, >> gateway_client, target_id, name) >> 317 raise Py4JJavaError( >> 318 "An error occurred while calling >> {0}{1}{2}.\n". >> --> 319 format(target_id, ".", name), value) >> 320 else: >> 321 raise Py4JError( >> >> Py4JJavaError: An error occurred while calling o566.showString. >> : org.apache.spark.SparkException: Job aborted due to stage failure: >> Task 0 in stage 98.0 failed 1 times, most recent failure: Lost task 0.0 in >> stage 98.0 (TID 521, localhost, executor driver): >> org.apache.spark.api.python.PythonException: Traceback (most recent call >> last): >> File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth >> on\lib\pyspark.zip\pyspark\worker.py", line 174, in main >> File "C:\
Re: how to find the nearest holiday
Got it working now! Does anyone have a pyspark example of how to calculate the numbers of days from the nearest holiday based on an array column? I.e. from this table +--+---+ |start_date|holiday| +--+---+ |2017-08-11|[2017-05-30,2017-10-01]| calculate a column called "days_from_nearest_holiday" which calculates the difference between 11 aug 2017 and 1 oct 2017? On Tue, Apr 25, 2017 at 6:00 PM, Wen Pei Yu wrote: > TypeError: unorderable types: str() >= datetime.date() > > Should transfer string to Date type when compare. > > Yu Wenpei. > > > - Original message - > From: Zeming Yu > To: user > Cc: > Subject: how to find the nearest holiday > Date: Tue, Apr 25, 2017 3:39 PM > > I have a column of dates (date type), just trying to find the nearest > holiday of the date. Anyone has any idea what went wrong below? > > > > start_date_test = flight3.select("start_date").distinct() > start_date_test.show() > > holidays = ['2017-09-01', '2017-10-01'] > > +--+ > |start_date| > +--+ > |2017-08-11| > |2017-09-11| > |2017-09-28| > |2017-06-29| > |2017-09-29| > |2017-07-31| > |2017-08-14| > |2017-08-18| > |2017-04-09| > |2017-09-21| > |2017-08-10| > |2017-06-30| > |2017-08-19| > |2017-07-06| > |2017-06-28| > |2017-09-14| > |2017-08-08| > |2017-08-22| > |2017-07-03| > |2017-07-30| > +--+ > only showing top 20 rows > > > > index = spark.sparkContext.broadcast(sorted(holidays)) > > def nearest_holiday(date): > last_holiday = index.value[0] > for next_holiday in index.value: > if next_holiday >= date: > break > last_holiday = next_holiday > if last_holiday > date: > last_holiday = None > if next_holiday < date: > next_holiday = None > return (last_holiday, next_holiday) > > > from pyspark.sql.types import * > return_type = StructType([StructField('last_holiday', StringType()), > StructField('next_holiday', StringType())]) > > from pyspark.sql.functions import udf > nearest_holiday_udf = udf(nearest_holiday, return_type) > > start_date_test.withColumn('holiday', > nearest_holiday_udf('start_date')).show(5, > False) > > > here's the error I got: > > > --- > Py4JJavaError Traceback (most recent call > last) > in () > 24 nearest_holiday_udf = udf(nearest_holiday, return_type) > 25 > ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf( > 'start_date')).show(5, False) > > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\ > python\pyspark\sql\dataframe.py in show(self, n, truncate) > 318 print(self._jdf.showString(n, 20)) > 319 else: > --> 320 print(self._jdf.showString(n, int(truncate))) > 321 > 322 def __repr__(self): > > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\ > python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, > *args) >1131 answer = self.gateway_client.send_command(command) >1132 return_value = get_return_value( > -> 1133 answer, self.gateway_client, self.target_id, self.name > ) >1134 >1135 for temp_arg in temp_args: > > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\ > python\pyspark\sql\utils.py in deco(*a, **kw) > 61 def deco(*a, **kw): > 62 try: > ---> 63 return f(*a, **kw) > 64 except py4j.protocol.Py4JJavaError as e: > 65 s = e.java_exception.toString() > > C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\ > python\lib\py4j-0.10.4-src.zip\py4j\protocol.py in get_return_value(answer, > gateway_client, target_id, name) > 317 raise Py4JJavaError( > 318 "An error occurred while calling {0}{1}{2}.\n" > . > --> 319 format(target_id, ".", name), value) > 320 else: > 321 raise Py4JError( > > Py4JJavaError: An error occurred while calling o566.showString. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0 in stage 98.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 98.0 (TID 521, localhost, executor driver): > org.apache.spark.api.python.PythonException: > Traceback (most recent call last): > File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\ > python\lib\pyspark.zip\pyspark\worker.py", line 174, in main > File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\ > python\lib\pyspark.zip\pyspark\worker.py", line 169, in process > File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\ > python\lib\pyspark.zip\pyspark\serializers.py", line 220, in dump_stream > self.serializer.dump_stream(self._batched(iterator), stream) > File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\ > python\lib\pyspark.zip\pyspark\serializers.py"
Re: pyspark vector
Well the 3 in this case is the size of the sparse vector. This equates to the number of features, which for CountVectorizer (I assume that's what you're using) is also vocab size (number of unique terms). On Tue, 25 Apr 2017 at 04:06 Peyman Mohajerian wrote: > setVocabSize > > > On Mon, Apr 24, 2017 at 5:36 PM, Zeming Yu wrote: > >> Hi all, >> >> Beginner question: >> >> what does the 3 mean in the (3,[0,1,2],[1.0,1.0,1.0])? >> >> https://spark.apache.org/docs/2.1.0/ml-features.html >> >> id | texts | vector >> |-|--- >> 0 | Array("a", "b", "c")| (3,[0,1,2],[1.0,1.0,1.0]) >> 1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0]) >> >> >
Re: how to find the nearest holiday
TypeError: unorderable types: str() >= datetime.date() Should transfer string to Date type when compare. Yu Wenpei. - Original message -From: Zeming Yu To: user Cc:Subject: how to find the nearest holidayDate: Tue, Apr 25, 2017 3:39 PM I have a column of dates (date type), just trying to find the nearest holiday of the date. Anyone has any idea what went wrong below? start_date_test = flight3.select("start_date").distinct() start_date_test.show() holidays = ['2017-09-01', '2017-10-01'] +--+|start_date|+--+|2017-08-11||2017-09-11||2017-09-28||2017-06-29||2017-09-29||2017-07-31||2017-08-14||2017-08-18||2017-04-09||2017-09-21||2017-08-10||2017-06-30||2017-08-19||2017-07-06||2017-06-28||2017-09-14||2017-08-08||2017-08-22||2017-07-03||2017-07-30|+--+only showing top 20 rows index = spark.sparkContext.broadcast(sorted(holidays)) def nearest_holiday(date): last_holiday = index.value[0] for next_holiday in index.value: if next_holiday >= date: break last_holiday = next_holiday if last_holiday > date: last_holiday = None if next_holiday < date: next_holiday = None return (last_holiday, next_holiday) from pyspark.sql.types import * return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())]) from pyspark.sql.functions import udf nearest_holiday_udf = udf(nearest_holiday, return_type) start_date_test.withColumn('holiday', nearest_holiday_udf('start_date')).show(5, False) here's the error I got: ---Py4JJavaError Traceback (most recent call last) in () 24 nearest_holiday_udf = udf(nearest_holiday, return_type) 25 ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf('start_date')).show(5, False)C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\dataframe.py in show(self, n, truncate) 318 print(self._jdf.showString(n, 20)) 319 else:--> 320 print(self._jdf.showString(n, int(truncate))) 321 322 def __repr__(self):C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value(-> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try:---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n".--> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError(Py4JJavaError: An error occurred while calling o566.showString.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 98.0 failed 1 times, most recent failure: Lost task 0.0 in stage 98.0 (TID 521, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 174, in main File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 169, in process File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 138, in dump_stream for obj in iterator: File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 209, in _batched for item in iterator: File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 92, in File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 68, in File "", line 10, in nearest_holidayTypeError: unorderable types: str() >= datetime.date()at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$an
how to find the nearest holiday
I have a column of dates (date type), just trying to find the nearest holiday of the date. Anyone has any idea what went wrong below? start_date_test = flight3.select("start_date").distinct() start_date_test.show() holidays = ['2017-09-01', '2017-10-01'] +--+ |start_date| +--+ |2017-08-11| |2017-09-11| |2017-09-28| |2017-06-29| |2017-09-29| |2017-07-31| |2017-08-14| |2017-08-18| |2017-04-09| |2017-09-21| |2017-08-10| |2017-06-30| |2017-08-19| |2017-07-06| |2017-06-28| |2017-09-14| |2017-08-08| |2017-08-22| |2017-07-03| |2017-07-30| +--+ only showing top 20 rows index = spark.sparkContext.broadcast(sorted(holidays)) def nearest_holiday(date): last_holiday = index.value[0] for next_holiday in index.value: if next_holiday >= date: break last_holiday = next_holiday if last_holiday > date: last_holiday = None if next_holiday < date: next_holiday = None return (last_holiday, next_holiday) from pyspark.sql.types import * return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())]) from pyspark.sql.functions import udf nearest_holiday_udf = udf(nearest_holiday, return_type) start_date_test.withColumn('holiday', nearest_holiday_udf('start_date')).show(5, False) here's the error I got: ---Py4JJavaError Traceback (most recent call last) in () 24 nearest_holiday_udf = udf(nearest_holiday, return_type) 25 ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf('start_date')).show(5, False) C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\dataframe.py in show(self, n, truncate)318 print(self._jdf.showString(n, 20))319 else:--> 320 print(self._jdf.showString(n, int(truncate)))321 322 def __repr__(self): C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value(-> 1133 answer, self.gateway_client, self.target_id, self.name) 11341135 for temp_arg in temp_args: C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try:---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)317 raise Py4JJavaError(318 "An error occurred while calling {0}{1}{2}.\n".--> 319 format(target_id, ".", name), value)320 else:321 raise Py4JError( Py4JJavaError: An error occurred while calling o566.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 98.0 failed 1 times, most recent failure: Lost task 0.0 in stage 98.0 (TID 521, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 174, in main File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 169, in process File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 138, in dump_stream for obj in iterator: File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 209, in _batched for item in iterator: File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 92, in File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 68, in File "", line 10, in nearest_holiday TypeError: unorderable types: str() >= datetime.date() at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$$anonfun$mapPar