Looking for a developer to help us with a small ETL project using Spark and Kubernetes
Hello, We are looking for a developer to help us with a small ETL project using Spark and Kubernetes. Here are some of the requirements: 1. We need a REST API to run and schedule jobs. We would prefer this done in Node.js but can be done using Java. The REST API will not be available to the public. 2. We need an easy way to create new jobs in Java without deploying the whole server again. 3. We want jobs deployed/ran using Kubernetes. 4. Must be able to scale to 1000s of ETL jobs. 5. Source for data will be one REST API. 6. Destination for data will be one Couchbase Database cluster. (Couchbase also uses a REST API) 7. I am not sure how many records will be processed per job. 8. The data is mostly sales related data. I know there are commercial ETL solutions that do everything I want. We are looking for something simple and do not need a fancy UI to describe our ETL. We want to use Spark and Java to programmatically describe out ETL jobs. Please let me know if you are interested. Thanks, Warren Bell -- ** This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. They may not be disseminated or distributed to persons or entities other than the ones intended without the authority of the sender. If you have received this email in error or are not the intended recipient, you may not use, copy, disseminate or distribute it. Delete it immediately from your system and notify the sender promptly by email that you have done so. This footnote also confirms that this email message has been scanned for the presence of computer viruses. ** Please consider the environment before printing - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Usage of PyArrow in Spark
It would be possible to use arrow on regular python udfs and avoid pandas, and there would probably be some performance improvement. The difficult part will be to ensure that the data remains consistent in the conversions between Arrow and Python, e.g. timestamps are a bit tricky. Given that we already have pandas_udfs, I'm not sure if it would be worth the effort but it might be a good experiment to see how much improvement it would bring. Bryan On Thu, Jul 18, 2019 at 12:02 AM Abdeali Kothari wrote: > I was thinking of implementing that. But quickly realized that doing a > conversion of Spark -> Pandas -> Python causes errors. > > A quick example being "None" in Numeric data types. > Pandas supports only NaN. Spark supports NULL and NaN. > > This is just one of the issues I came to. > I'm not sure about some of the more complex types like Array, Map, struct > which are internally converted to pd.Series with type being object. > > I think that avoiding pandas in between and doing something from Arrow to > Python would be more efficient as, if I understand right, Arrow has a wider > range of types and can handle this better. > > >>> from pyspark.sql import functions as F > >>> sdf = spark.createDataFrame([ [None], [float('nan')], [1.1] ], ['val']) > > # Return the column with no change > >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col) > >>> sdf.select(sdf['val'], udf(sdf['val'])).show() > ++-+ > | val|(val)| > ++-+ > |null| null| > | NaN| null| > | 1.1| 1.1| > ++-+ > > # isnull() > >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: > col.isnull()) > >>> sdf.select(sdf['val'], udf(sdf['val'])).show() > ++-+ > | val|(val)| > ++-+ > |null| 1.0| > | NaN| 1.0| > | 1.1| 0.0| > ++-+ > > # Check for "is None" > >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: > col.apply(lambda x: x is None)) > >>> sdf.select(sdf['val'], udf(sdf['val'])).show() > ++-+ > | val|(val)| > ++-+ > |null| 0.0| > | NaN| 0.0| > | 1.1| 0.0| > ++-+ > > On Wed, Jul 17, 2019 at 4:47 PM Hyukjin Kwon wrote: > >> Regular Python UDFs don't use PyArrow under the hood. >> Yes, they can potentially benefit but they can be easily worked around >> via Pandas UDFs. >> >> For instance, both below are virtually identical. >> >> @udf(...) >> def func(col): >> return col >> >> @pandas_udf(...) >> def pandas_func(col): >> return a.apply(lambda col: col) >> >> If we only need some minimised change, I would be positive about adding >> Arrow support into regular Python UDFs. Otherwise, I am not sure yet. >> >> >> 2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari 님이 >> 작성: >> >>> Hi, >>> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in >>> spark. And I was trying to understand the benefit in terms of serialization >>> / deserializaiton it provides. >>> >>> I understand that the new pandas-udf works only if pyarrow is installed. >>> But what about the plain old PythonUDF which can be used in map() kind >>> of operations? >>> Are they also using pyarrow under the hood to reduce the cost is serde? >>> Or do they remain as earlier and no performance gain should be expected in >>> those? >>> >>> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow >>> as the data transfer cost to serialize/deserialzie from Java to Python and >>> back still exists and could potentially be reduced by using Arrow? >>> Is my understanding correct? Are there any plans to implement this? >>> >>> Pointers to any notes or Jira about this would be appreciated. >>> >>
Re: spark standalone mode problem about executor add and removed again and again!
I would also check firewall rules. Is communication allowed on all the required port ranges and hosts ? On Thu, Jul 18, 2019 at 3:56 AM Amit Sharma wrote: > Do you have dynamic resource allocation enabled? > > > On Wednesday, July 17, 2019, zenglong chen > wrote: > >> Hi,all, >> My standalone mode has two slaves.When I submit my job,the >> localhost slave is working well,but second slave do add and remove executor >> action always!The log are below: >>2019-07-17 10:51:38,889 INFO >> client.StandaloneAppClient$ClientEndpoint: Executor updated: >> app-20190717105135-0008/2 is now EXITED (Command exited with code 1) >> 2019-07-17 10:51:38,890 INFO cluster.StandaloneSchedulerBackend: Executor >> app-20190717105135-0008/2 removed: Command exited with code 1 >> 2019-07-17 10:51:38,890 INFO storage.BlockManagerMasterEndpoint: Trying >> to remove executor 2 from BlockManagerMaster. >> 2019-07-17 10:51:38,890 INFO storage.BlockManagerMaster: Removal of >> executor 2 requested >> 2019-07-17 10:51:38,891 INFO >> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove >> non-existent executor 2 >> 2019-07-17 10:51:38,892 INFO client.StandaloneAppClient$ClientEndpoint: >> Executor added: app-20190717105135-0008/3 on >> worker-20190717093045-172.22.9.179-40573 (172.22.9.179:40573) with 8 >> core(s) >> 2019-07-17 10:51:38,892 INFO cluster.StandaloneSchedulerBackend: Granted >> executor ID app-20190717105135-0008/3 on hostPort 172.22.9.179:40573 >> with 8 core(s), 12.0 GB RAM >> 2019-07-17 10:51:38,893 INFO client.StandaloneAppClient$ClientEndpoint: >> Executor updated: app-20190717105135-0008/3 is now RUNNING >> 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint: >> Executor updated: app-20190717105135-0008/3 is now EXITED (Command exited >> with code 1) >> 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Executor >> app-20190717105135-0008/3 removed: Command exited with code 1 >> 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint: >> Executor added: app-20190717105135-0008/4 on >> worker-20190717093045-172.22.9.179-40573 (172.22.9.179:40573) with 8 >> core(s) >> 2019-07-17 10:51:40,521 INFO storage.BlockManagerMaster: Removal of >> executor 3 requested >> 2019-07-17 10:51:40,521 INFO >> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove >> non-existent executor 3 >> 2019-07-17 10:51:40,521 INFO storage.BlockManagerMasterEndpoint: Trying >> to remove executor 3 from BlockManagerMaster. >> 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Granted >> executor ID app-20190717105135-0008/4 on hostPort 172.22.9.179:40573 >> with 8 core(s), 12.0 GB RAM >> 2019-07-17 10:51:40,523 INFO client.StandaloneAppClient$ClientEndpoint: >> Executor updated: app-20190717105135-0008/4 is now RUNNING >> >> >> And the slave output are below: >>19/07/17 10:47:12 INFO ExecutorRunner: Launch command: >> "/home/ubuntu/data/jdk/jre/bin/java" "-cp" >> "/home/ubuntu/spark-2.4.3/conf/:/home/ubuntu/spark-2.4.3/jars/*" >> "-Xmx12288M" "-Dspark.driver.port=40335" >> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" >> "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335" >> "--executor-id" "18" "--hostname" "172.22.9.179" "--cores" "8" "--app-id" >> "app-20190717104645-0007" "--worker-url" "spark:// >> Worker@172.22.9.179:40573" >> 19/07/17 10:47:13 INFO Worker: Executor app-20190717104645-0007/18 >> finished with state EXITED message Command exited with code 1 exitStatus 1 >> 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Clean up non-shuffle >> files associated with the finished executor 18 >> 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Executor is not >> registered (appId=app-20190717104645-0007, execId=18) >> 19/07/17 10:47:13 INFO Worker: Asked to launch executor >> app-20190717104645-0007/19 for ph_user_pre_level >> 19/07/17 10:47:13 INFO SecurityManager: Changing view acls to: ubuntu >> 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls to: ubuntu >> 19/07/17 10:47:13 INFO SecurityManager: Changing view acls groups to: >> 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls groups to: >> 19/07/17 10:47:13 INFO SecurityManager: SecurityManager: authentication >> disabled; ui acls disabled; users with view permissions: Set(ubuntu); >> groups with view permissions: Set(); users with modify permissions: >> Set(ubuntu); groups with modify permissions: Set() >> 19/07/17 10:47:14 INFO ExecutorRunner: Launch command: >> "/home/ubuntu/data/jdk/jre/bin/java" "-cp" >> "/home/ubuntu/spark-2.4.3/conf/:/home/ubuntu/spark-2.4.3/jars/*" >> "-Xmx12288M" "-Dspark.driver.port=40335" >> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" >> "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335" >> "--executor-id" "19" "--hostname" "172.22.9.179" "--cores" "8" "--app-id" >> "app-20190717104645-0007" "--worker-url" "spark:// >>
Binding spark workers to a network interface
Hi all, Is there a configuration to force spark to use a specific network interface to communicate. The machines we are using have three network interfaces and we would like to bind them to a specific network interface. Best, Supun..
unsubscribe
unsubscribe At 2019-07-16 23:24:28, "Dongjoon Hyun" wrote: Thank you for volunteering for 2.3.4 release manager, Kazuaki! It's great to see a new release manager in advance. :D Thank you for reply, Stavros. In addition to that issue, I'm also monitoring some other K8s issues and PRs. But, I'm not sure we can have that because some PRs seems to fail at building consensus (even for 3.0.0). In any way, could you ping the reviewers once more on those PRs which you have concerns? If it is merged into `branch-2.4`, it will be Apache Spark 2.4.4 of course. Bests, Dongjoon. On Tue, Jul 16, 2019 at 4:00 AM Kazuaki Ishizaki wrote: Thank you Dongjoon for being a release manager. If the assumed dates are ok, I would like to volunteer for an 2.3.4 release manager. Best Regards, Kazuaki Ishizaki, From:Dongjoon Hyun To:dev , "user @spark" , Apache Spark PMC Date:2019/07/13 07:18 Subject:[EXTERNAL] Re: Release Apache Spark 2.4.4 before 3.0.0 Thank you, Jacek. BTW, I added `@private` since we need PMC's help to make an Apache Spark release. Can I get more feedbacks from the other PMC members? Please me know if you have any concerns (e.g. Release date or Release manager?) As one of the community members, I assumed the followings (if we are on schedule). - 2.4.4 at the end of July - 2.3.4 at the end of August (since 2.3.0 was released at the end of February 2018) - 3.0.0 (possibily September?) - 3.1.0 (January 2020?) Bests, Dongjoon. On Thu, Jul 11, 2019 at 1:30 PM Jacek Laskowski wrote: Hi, Thanks Dongjoon Hyun for stepping up as a release manager! Much appreciated. If there's a volunteer to cut a release, I'm always to support it. In addition, the more frequent releases the better for end users so they have a choice to upgrade and have all the latest fixes or wait. It's their call not ours (when we'd keep them waiting). My big 2 yes'es for the release! Jacek On Tue, 9 Jul 2019, 18:15 Dongjoon Hyun, wrote: Hi, All. Spark 2.4.3 was released two months ago (8th May). As of today (9th July), there exist 45 fixes in `branch-2.4` including the following correctness or blocker issues. - SPARK-26038 Decimal toScalaBigInt/toJavaBigInteger not work for decimals not fitting in long - SPARK-26045 Error in the spark 2.4 release package with the spark-avro_2.11 dependency - SPARK-27798 from_avro can modify variables in other rows in local mode - SPARK-27907 HiveUDAF should return NULL in case of 0 rows - SPARK-28157 Make SHS clear KVStore LogInfo for the blacklist entries - SPARK-28308 CalendarInterval sub-second part should be padded before parsing It would be great if we can have Spark 2.4.4 before we are going to get busier for 3.0.0. If it's okay, I'd like to volunteer for an 2.4.4 release manager to roll it next Monday. (15th July). How do you think about this? Bests, Dongjoon.
Re: Usage of PyArrow in Spark
I was thinking of implementing that. But quickly realized that doing a conversion of Spark -> Pandas -> Python causes errors. A quick example being "None" in Numeric data types. Pandas supports only NaN. Spark supports NULL and NaN. This is just one of the issues I came to. I'm not sure about some of the more complex types like Array, Map, struct which are internally converted to pd.Series with type being object. I think that avoiding pandas in between and doing something from Arrow to Python would be more efficient as, if I understand right, Arrow has a wider range of types and can handle this better. >>> from pyspark.sql import functions as F >>> sdf = spark.createDataFrame([ [None], [float('nan')], [1.1] ], ['val']) # Return the column with no change >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col) >>> sdf.select(sdf['val'], udf(sdf['val'])).show() ++-+ | val|(val)| ++-+ |null| null| | NaN| null| | 1.1| 1.1| ++-+ # isnull() >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col.isnull()) >>> sdf.select(sdf['val'], udf(sdf['val'])).show() ++-+ | val|(val)| ++-+ |null| 1.0| | NaN| 1.0| | 1.1| 0.0| ++-+ # Check for "is None" >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col.apply(lambda x: x is None)) >>> sdf.select(sdf['val'], udf(sdf['val'])).show() ++-+ | val|(val)| ++-+ |null| 0.0| | NaN| 0.0| | 1.1| 0.0| ++-+ On Wed, Jul 17, 2019 at 4:47 PM Hyukjin Kwon wrote: > Regular Python UDFs don't use PyArrow under the hood. > Yes, they can potentially benefit but they can be easily worked around via > Pandas UDFs. > > For instance, both below are virtually identical. > > @udf(...) > def func(col): > return col > > @pandas_udf(...) > def pandas_func(col): > return a.apply(lambda col: col) > > If we only need some minimised change, I would be positive about adding > Arrow support into regular Python UDFs. Otherwise, I am not sure yet. > > > 2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari 님이 작성: > >> Hi, >> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in >> spark. And I was trying to understand the benefit in terms of serialization >> / deserializaiton it provides. >> >> I understand that the new pandas-udf works only if pyarrow is installed. >> But what about the plain old PythonUDF which can be used in map() kind of >> operations? >> Are they also using pyarrow under the hood to reduce the cost is serde? >> Or do they remain as earlier and no performance gain should be expected in >> those? >> >> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow >> as the data transfer cost to serialize/deserialzie from Java to Python and >> back still exists and could potentially be reduced by using Arrow? >> Is my understanding correct? Are there any plans to implement this? >> >> Pointers to any notes or Jira about this would be appreciated. >> >