Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed
Hi Marcelo, I see what you mean. Tried it but still got same error message. Error from python worker: > Traceback (most recent call last): > File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in > _run_module_as_main > mod_name, mod_spec, code = _get_module_details(mod_name, _Error) > File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in > _get_module_details > __import__(pkg_name) > File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", > line 46, in > File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", > line 29, in > ModuleNotFoundError: No module named 'py4j' > PYTHONPATH was: > > /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk3/yarn/usercache/jianshi.huang/filecache/134/__spark_libs__8468485589501316413.zip/spark-core_2.11-2.3.2.jar > > On Fri, Oct 5, 2018 at 1:25 AM Marcelo Vanzin wrote: > Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get > expanded by the shell). > > But it's really weird to be setting SPARK_HOME in the environment of > your node managers. YARN shouldn't need to know about that. > On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang > wrote: > > > > > https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31 > > > > The code shows Spark will try to find the path if SPARK_HOME is > specified. And on my worker node, SPARK_HOME is specified in .bashrc , for > the pre-installed 2.2.1 path. > > > > I don't want to make any changes to worker node configuration, so any > way to override the order? > > > > Jianshi > > > > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin > wrote: > >> > >> Normally the version of Spark installed on the cluster does not > >> matter, since Spark is uploaded from your gateway machine to YARN by > >> default. > >> > >> You probably have some configuration (in spark-defaults.conf) that > >> tells YARN to use a cached copy. Get rid of that configuration, and > >> you can use whatever version you like. > >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang > wrote: > >> > > >> > Hi, > >> > > >> > I have a problem using multiple versions of Pyspark on YARN, the > driver and worker nodes are all preinstalled with Spark 2.2.1, for > production tasks. And I want to use 2.3.2 for my personal EDA. > >> > > >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), > however on the worker node, the PYTHONPATH still uses the system SPARK_HOME. > >> > > >> > Anyone knows how to override the PYTHONPATH on worker nodes? > >> > > >> > Here's the error message, > >> >> > >> >> > >> >> Py4JJavaError: An error occurred while calling o75.collectToPython. > >> >> : org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2): > org.apache.spark.SparkException: > >> >> Error from python worker: > >> >> Traceback (most recent call last): > >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in > _run_module_as_main > >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error) > >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in > _get_module_details > >> >> __import__(pkg_name) > >> >> File > "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line > 46, in > >> >> File > "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line > 29, in > >> >> ModuleNotFoundError: No module named 'py4j' > >> >> PYTHONPATH was: > >> >> > /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar > >> > > >> > > >> > And here's how I started Pyspark session in Jupyter. > >> >> > >> >> > >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7 > >> >> %env PYSPARK_PYTHON=/usr/bin/python3 > >> >> import findsp
Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed
Thanks Marcelo, But I don't want to install 2.3.2 on the worker nodes. I just want Spark to use the path of the files uploaded to YARN instead of the SPARK_HOME. On Fri, Oct 5, 2018 at 1:25 AM Marcelo Vanzin wrote: > Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get > expanded by the shell). > > But it's really weird to be setting SPARK_HOME in the environment of > your node managers. YARN shouldn't need to know about that. > On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang > wrote: > > > > > https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31 > > > > The code shows Spark will try to find the path if SPARK_HOME is > specified. And on my worker node, SPARK_HOME is specified in .bashrc , for > the pre-installed 2.2.1 path. > > > > I don't want to make any changes to worker node configuration, so any > way to override the order? > > > > Jianshi > > > > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin > wrote: > >> > >> Normally the version of Spark installed on the cluster does not > >> matter, since Spark is uploaded from your gateway machine to YARN by > >> default. > >> > >> You probably have some configuration (in spark-defaults.conf) that > >> tells YARN to use a cached copy. Get rid of that configuration, and > >> you can use whatever version you like. > >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang > wrote: > >> > > >> > Hi, > >> > > >> > I have a problem using multiple versions of Pyspark on YARN, the > driver and worker nodes are all preinstalled with Spark 2.2.1, for > production tasks. And I want to use 2.3.2 for my personal EDA. > >> > > >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), > however on the worker node, the PYTHONPATH still uses the system SPARK_HOME. > >> > > >> > Anyone knows how to override the PYTHONPATH on worker nodes? > >> > > >> > Here's the error message, > >> >> > >> >> > >> >> Py4JJavaError: An error occurred while calling o75.collectToPython. > >> >> : org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2): > org.apache.spark.SparkException: > >> >> Error from python worker: > >> >> Traceback (most recent call last): > >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in > _run_module_as_main > >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error) > >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in > _get_module_details > >> >> __import__(pkg_name) > >> >> File > "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line > 46, in > >> >> File > "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line > 29, in > >> >> ModuleNotFoundError: No module named 'py4j' > >> >> PYTHONPATH was: > >> >> > /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar > >> > > >> > > >> > And here's how I started Pyspark session in Jupyter. > >> >> > >> >> > >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7 > >> >> %env PYSPARK_PYTHON=/usr/bin/python3 > >> >> import findspark > >> >> findspark.init() > >> >> import pyspark > >> >> sparkConf = pyspark.SparkConf() > >> >> sparkConf.setAll([ > >> >> ('spark.cores.max', '96') > >> >> ,('spark.driver.memory', '2g') > >> >> ,('spark.executor.cores', '4') > >> >> ,('spark.executor.instances', '2') > >> >> ,('spark.executor.memory', '4g') > >> >> ,('spark.network.timeout', '800') > >> >> ,('spark.scheduler.mode', 'FAIR') > >> >> ,('spark.shuffle.service.enabled', 'true') > >> >> ,('spark.dynamicAllocation.enabled', 'true') > >> >> ]) > >> >> py_files = > ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip'] > >> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client", > conf=sparkConf, pyFiles=py_files) > >> >> > >> > > >> > > >> > Thanks, > >> > -- > >> > Jianshi Huang > >> > > >> > >> > >> -- > >> Marcelo > > > > > > > > -- > > Jianshi Huang > > > > LinkedIn: jianshi > > Twitter: @jshuang > > Github & Blog: http://huangjs.github.com/ > > > > -- > Marcelo > -- Jianshi Huang
Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed
Yes, that's right. On Fri, Oct 5, 2018 at 3:35 AM Gourav Sengupta wrote: > Hi Marcelo, > it will be great if you illustrate what you mean, I will be interested to > know. > > Hi Jianshi, > so just to be sure you want to work on SPARK 2.3 while having SPARK 2.1 > installed in your cluster? > > Regards, > Gourav Sengupta > > On Thu, Oct 4, 2018 at 6:26 PM Marcelo Vanzin > wrote: > >> Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get >> expanded by the shell). >> >> But it's really weird to be setting SPARK_HOME in the environment of >> your node managers. YARN shouldn't need to know about that. >> On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang >> wrote: >> > >> > >> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31 >> > >> > The code shows Spark will try to find the path if SPARK_HOME is >> specified. And on my worker node, SPARK_HOME is specified in .bashrc , for >> the pre-installed 2.2.1 path. >> > >> > I don't want to make any changes to worker node configuration, so any >> way to override the order? >> > >> > Jianshi >> > >> > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin >> wrote: >> >> >> >> Normally the version of Spark installed on the cluster does not >> >> matter, since Spark is uploaded from your gateway machine to YARN by >> >> default. >> >> >> >> You probably have some configuration (in spark-defaults.conf) that >> >> tells YARN to use a cached copy. Get rid of that configuration, and >> >> you can use whatever version you like. >> >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang >> wrote: >> >> > >> >> > Hi, >> >> > >> >> > I have a problem using multiple versions of Pyspark on YARN, the >> driver and worker nodes are all preinstalled with Spark 2.2.1, for >> production tasks. And I want to use 2.3.2 for my personal EDA. >> >> > >> >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), >> however on the worker node, the PYTHONPATH still uses the system SPARK_HOME. >> >> > >> >> > Anyone knows how to override the PYTHONPATH on worker nodes? >> >> > >> >> > Here's the error message, >> >> >> >> >> >> >> >> >> Py4JJavaError: An error occurred while calling o75.collectToPython. >> >> >> : org.apache.spark.SparkException: Job aborted due to stage >> failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task >> 0.3 in stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2): >> org.apache.spark.SparkException: >> >> >> Error from python worker: >> >> >> Traceback (most recent call last): >> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in >> _run_module_as_main >> >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error) >> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in >> _get_module_details >> >> >> __import__(pkg_name) >> >> >> File >> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line >> 46, in >> >> >> File >> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line >> 29, in >> >> >> ModuleNotFoundError: No module named 'py4j' >> >> >> PYTHONPATH was: >> >> >> >> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar >> >> > >> >> > >> >> > And here's how I started Pyspark session in Jupyter. >> >> >> >> >> >> >> >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7 >> >> >> %env PYSPARK_PYTHON=/usr/bin/python3 >> >> >> import findspark >> >> >> findspark.init() >> >> >> import pyspark >> >> >> sparkConf = pyspark.SparkConf() >> >> >> sparkConf.setAll([ >> >> >> ('spark.cores.max', '96') >> >> >> ,('spark.driver.memory', '2g') >> >> >> ,('spark.executor.cores', '4') >> >> >> ,('spark.executor.instances', '2') >> >> >> ,('spark.executor.memory', '4g') >> >> >> ,('spark.network.timeout', '800') >> >> >> ,('spark.scheduler.mode', 'FAIR') >> >> >> ,('spark.shuffle.service.enabled', 'true') >> >> >> ,('spark.dynamicAllocation.enabled', 'true') >> >> >> ]) >> >> >> py_files = >> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip'] >> >> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client", >> conf=sparkConf, pyFiles=py_files) >> >> >> >> >> > >> >> > >> >> > Thanks, >> >> > -- >> >> > Jianshi Huang >> >> > >> >> >> >> >> >> -- >> >> Marcelo >> > >> > >> > >> > -- >> > Jianshi Huang >> > >> > LinkedIn: jianshi >> > Twitter: @jshuang >> > Github & Blog: http://huangjs.github.com/ >> >> >> >> -- >> Marcelo >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- Jianshi Huang
Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed
https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31 The code shows Spark will try to find the path if SPARK_HOME is specified. And on my worker node, SPARK_HOME is specified in .bashrc , for the pre-installed 2.2.1 path. I don't want to make any changes to worker node configuration, so any way to override the order? Jianshi On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin wrote: > Normally the version of Spark installed on the cluster does not > matter, since Spark is uploaded from your gateway machine to YARN by > default. > > You probably have some configuration (in spark-defaults.conf) that > tells YARN to use a cached copy. Get rid of that configuration, and > you can use whatever version you like. > On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang > wrote: > > > > Hi, > > > > I have a problem using multiple versions of Pyspark on YARN, the driver > and worker nodes are all preinstalled with Spark 2.2.1, for production > tasks. And I want to use 2.3.2 for my personal EDA. > > > > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however > on the worker node, the PYTHONPATH still uses the system SPARK_HOME. > > > > Anyone knows how to override the PYTHONPATH on worker nodes? > > > > Here's the error message, > >> > >> > >> Py4JJavaError: An error occurred while calling o75.collectToPython. > >> : org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2): > org.apache.spark.SparkException: > >> Error from python worker: > >> Traceback (most recent call last): > >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in > _run_module_as_main > >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error) > >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in > _get_module_details > >> __import__(pkg_name) > >> File > "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line > 46, in > >> File > "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line > 29, in > >> ModuleNotFoundError: No module named 'py4j' > >> PYTHONPATH was: > >> > /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar > > > > > > And here's how I started Pyspark session in Jupyter. > >> > >> > >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7 > >> %env PYSPARK_PYTHON=/usr/bin/python3 > >> import findspark > >> findspark.init() > >> import pyspark > >> sparkConf = pyspark.SparkConf() > >> sparkConf.setAll([ > >> ('spark.cores.max', '96') > >> ,('spark.driver.memory', '2g') > >> ,('spark.executor.cores', '4') > >> ,('spark.executor.instances', '2') > >> ,('spark.executor.memory', '4g') > >> ,('spark.network.timeout', '800') > >> ,('spark.scheduler.mode', 'FAIR') > >> ,('spark.shuffle.service.enabled', 'true') > >> ,('spark.dynamicAllocation.enabled', 'true') > >> ]) > >> py_files = > ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip'] > >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client", > conf=sparkConf, pyFiles=py_files) > >> > > > > > > Thanks, > > -- > > Jianshi Huang > > > > > -- > Marcelo > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/
Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed
Hi, I have a problem using multiple versions of Pyspark on YARN, the driver and worker nodes are all preinstalled with Spark 2.2.1, for production tasks. And I want to use 2.3.2 for my personal EDA. I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however on the worker node, the PYTHONPATH still uses the system SPARK_HOME. Anyone knows how to override the PYTHONPATH on worker nodes? Here's the error message, > > Py4JJavaError: An error occurred while calling o75.collectToPython. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2): > org.apache.spark.SparkException: > Error from python worker: > Traceback (most recent call last): > File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in > _run_module_as_main > mod_name, mod_spec, code = _get_module_details(mod_name, _Error) > File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in > _get_module_details > __import__(pkg_name) > File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", > line 46, in > File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", > line 29, in > ModuleNotFoundError: No module named 'py4j' > PYTHONPATH was: > > /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar And here's how I started Pyspark session in Jupyter. > > %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7 > %env PYSPARK_PYTHON=/usr/bin/python3 > import findspark > findspark.init() > import pyspark > sparkConf = pyspark.SparkConf() > sparkConf.setAll([ > ('spark.cores.max', '96') > ,('spark.driver.memory', '2g') > ,('spark.executor.cores', '4') > ,('spark.executor.instances', '2') > ,('spark.executor.memory', '4g') > ,('spark.network.timeout', '800') > ,('spark.scheduler.mode', 'FAIR') > ,('spark.shuffle.service.enabled', 'true') > ,('spark.dynamicAllocation.enabled', 'true') > ]) > py_files = > ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip'] > sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client", > conf=sparkConf, pyFiles=py_files) > > Thanks, -- Jianshi Huang
Re: View all user's application logs in history server
No one using History server? :) Am I the only one need to see all user's logs? Jianshi On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I'm using Spark 1.4.0-rc1 and I'm using default settings for history server. But I can only see my own logs. Is it possible to view all user's logs? The permission is fine for the user group. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: View all user's application logs in history server
Yes, all written to the same directory on HDFS. Jianshi On Wed, May 27, 2015 at 11:57 PM, Marcelo Vanzin van...@cloudera.com wrote: You may be the only one not seeing all the logs. Are you sure all the users are writing to the same log directory? The HS can only read from a single log directory. On Wed, May 27, 2015 at 5:33 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: No one using History server? :) Am I the only one need to see all user's logs? Jianshi On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I'm using Spark 1.4.0-rc1 and I'm using default settings for history server. But I can only see my own logs. Is it possible to view all user's logs? The permission is fine for the user group. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Marcelo -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
View all user's application logs in history server
Hi, I'm using Spark 1.4.0-rc1 and I'm using default settings for history server. But I can only see my own logs. Is it possible to view all user's logs? The permission is fine for the user group. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Why so slow
Hi Olivier, Here it is. == Physical Plan == Aggregate false, [PartialGroup#155], [PartialGroup#155 AS is_bad#108,Coalesce(SUM(PartialCount#152L),0) AS count#109L,(CAST(SUM(PartialSum#153), DoubleType) / CAST(SUM(PartialCount#154L), DoubleType)) AS avg#110] Exchange (HashPartitioning [PartialGroup#155], 200), [] Aggregate true, [meta#143[is_bad]], [meta#143[is_bad] AS PartialGroup#155,COUNT(1) AS PartialCount#152L,COUNT(nvar#145[var1]) AS PartialCount#154L,SUM(nvar#145[var1]) AS PartialSum#153] Project [meta#143,nvar#145] Filter ((date#147 = 2014-04-01) (date#147 = 2014-04-30)) PhysicalRDD [meta#143,nvar#145,date#147], MapPartitionsRDD[6] at explain at console:32 Jianshi On Tue, May 12, 2015 at 10:34 PM, Olivier Girardot ssab...@gmail.com wrote: can you post the explain too ? Le mar. 12 mai 2015 à 12:11, Jianshi Huang jianshi.hu...@gmail.com a écrit : Hi, I have a SQL query on tables containing big Map columns (thousands of keys). I found it to be very slow. select meta['is_bad'] as is_bad, count(*) as count, avg(nvar['var1']) as avg from test where date between '2014-04-01' and '2014-04-30' group by meta['is_bad'] = +-+---+---+ | is_bad | count | avg | +-+---+---+ | 0 | 17024396 | 0.16257395850742645 | | 1 | 179729| -0.37626256661125485 | | 2 | 28128 | 0.11674427263203344 | | 3 | 116327| -0.6398689187187386 | | 4 | 87715 | -0.5349632960030563 | | 5 | 169771| 0.40812641191854626 | | 6 | 542447| 0.5238256418341465| | 7 | 160324| 0.29442847034840386 | | 8 | 2099 | -0.9165701665162977 | | 9 | 3104 | 0.3845685004598235| +-+---+---+ 10 rows selected (130.5 seconds) The total number of rows is less than 20M. Why so slow? I'm running on Spark 1.4.0-SNAPSHOT with 100 executors each having 4GB ram and 2 CPU core. Looks like https://issues.apache.org/jira/browse/SPARK-5446 is still open, when can we have it fixed? :) -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Why so slow
Hi, I have a SQL query on tables containing big Map columns (thousands of keys). I found it to be very slow. select meta['is_bad'] as is_bad, count(*) as count, avg(nvar['var1']) as avg from test where date between '2014-04-01' and '2014-04-30' group by meta['is_bad'] = +-+---+---+ | is_bad | count | avg | +-+---+---+ | 0 | 17024396 | 0.16257395850742645 | | 1 | 179729| -0.37626256661125485 | | 2 | 28128 | 0.11674427263203344 | | 3 | 116327| -0.6398689187187386 | | 4 | 87715 | -0.5349632960030563 | | 5 | 169771| 0.40812641191854626 | | 6 | 542447| 0.5238256418341465| | 7 | 160324| 0.29442847034840386 | | 8 | 2099 | -0.9165701665162977 | | 9 | 3104 | 0.3845685004598235| +-+---+---+ 10 rows selected (130.5 seconds) The total number of rows is less than 20M. Why so slow? I'm running on Spark 1.4.0-SNAPSHOT with 100 executors each having 4GB ram and 2 CPU core. Looks like https://issues.apache.org/jira/browse/SPARK-5446 is still open, when can we have it fixed? :) -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle
I'm using the default settings. Jianshi On Wed, May 6, 2015 at 7:05 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Can you please share your compression etc settings, which you are using. Thanks, Twinkle On Wed, May 6, 2015 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I'm facing this error in Spark 1.3.1 https://issues.apache.org/jira/browse/SPARK-4105 Anyone knows what's the workaround? Change the compression codec for shuffle output? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Parquet error reading data that contains array of structs
Version of SPARK-4520 is not set. I assume it was fixed in 1.3.0 Cheers Fix Version On Fri, Apr 24, 2015 at 11:00 AM, Yin Huai yh...@databricks.com wrote: The exception looks like the one mentioned in https://issues.apache.org/jira/browse/SPARK-4520. What is the version of Spark? On Fri, Apr 24, 2015 at 2:40 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, My data looks like this: +---++--+ | col_name | data_type | comment | +---++--+ | cust_id | string | | | part_num | int| | | ip_list | arraystructip:string | | | vid_list | arraystructvid:string | | | fso_list | arraystructfso:string | | | src | string | | | date | int| | +---++--+ And I did select *, it reports ParquetDecodingException. Is this type not supported in SparkSQL? Detailed error message here: Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage 27.0 (TID 510, lvshdc5dn0542.lvs.paypal.com): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://xxx/part-m-0.gz.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:400) at java.util.ArrayList.get(ArrayList.java:413) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:290) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com
Re: Parquet error reading data that contains array of structs
Hi Huai, I'm using Spark 1.3.1. You're right. The dataset is not generated by Spark. It's generated by Pig using Parquet 1.6.0rc7 jars. Let me see if I can send a testing dataset to you... Jianshi On Sat, Apr 25, 2015 at 2:22 AM, Yin Huai yh...@databricks.com wrote: oh, I missed that. It is fixed in 1.3.0. Also, Jianshi, the dataset was not generated by Spark SQL, right? On Fri, Apr 24, 2015 at 11:09 AM, Ted Yu yuzhih...@gmail.com wrote: Yin: Fix Version of SPARK-4520 is not set. I assume it was fixed in 1.3.0 Cheers Fix Version On Fri, Apr 24, 2015 at 11:00 AM, Yin Huai yh...@databricks.com wrote: The exception looks like the one mentioned in https://issues.apache.org/jira/browse/SPARK-4520. What is the version of Spark? On Fri, Apr 24, 2015 at 2:40 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, My data looks like this: +---++--+ | col_name | data_type | comment | +---++--+ | cust_id | string | | | part_num | int| | | ip_list | arraystructip:string | | | vid_list | arraystructvid:string | | | fso_list | arraystructfso:string | | | src | string | | | date | int| | +---++--+ And I did select *, it reports ParquetDecodingException. Is this type not supported in SparkSQL? Detailed error message here: Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage 27.0 (TID 510, lvshdc5dn0542.lvs.paypal.com): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://xxx/part-m-0.gz.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:400) at java.util.ArrayList.get(ArrayList.java:413) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:290) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2
Parquet error reading data that contains array of structs
Hi, My data looks like this: +---++--+ | col_name | data_type | comment | +---++--+ | cust_id | string | | | part_num | int| | | ip_list | arraystructip:string | | | vid_list | arraystructvid:string | | | fso_list | arraystructfso:string | | | src | string | | | date | int| | +---++--+ And I did select *, it reports ParquetDecodingException. Is this type not supported in SparkSQL? Detailed error message here: Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage 27.0 (TID 510, lvshdc5dn0542.lvs.paypal.com): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://xxx/part-m-0.gz.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:400) at java.util.ArrayList.get(ArrayList.java:413) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95) at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80) at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:290) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131) at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96) at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193) -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
How to write Hive's map(key, value, ...) in Spark SQL DSL
Hi, I want to write this in Spark SQL DSL: select map('c1', c1, 'c2', c2) as m from table Is there a way? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to do dispatching in Streaming?
Thanks everyone for the reply. Looks like foreachRDD + filtering is the way to go. I'll have 4 independent Spark streaming applications so the overhead seems acceptable. Jianshi On Fri, Apr 17, 2015 at 5:17 PM, Evo Eftimov evo.efti...@isecc.com wrote: Good use of analogies J Yep friction (or entropy in general) exists in everything – but hey by adding and doing “more work” at the same time (aka more powerful rockets) some people have overcome the friction of the air and even got as far as the moon and beyond It is all about the bottom lime / the big picture – in some models, friction can be a huge factor in the equations in some other it is just part of the landscape *From:* Gerard Maas [mailto:gerard.m...@gmail.com] *Sent:* Friday, April 17, 2015 10:12 AM *To:* Evo Eftimov *Cc:* Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie *Subject:* Re: How to do dispatching in Streaming? Evo, In Spark there's a fixed scheduling cost for each task, so more tasks mean an increased bottom line for the same amount of work being done. The number of tasks per batch interval should relate to the CPU resources available for the job following the same 'rule of thumbs' than for Spark, being 2-3 times the #of cores. In that physical model presented before, I think we could consider this scheduling cost as a form of friction. -kr, Gerard. On Thu, Apr 16, 2015 at 11:47 AM, Evo Eftimov evo.efti...@isecc.com wrote: Ooops – what does “more work” mean in a Parallel Programming paradigm and does it always translate in “inefficiency” Here are a few laws of physics in this space: 1. More Work if done AT THE SAME time AND fully utilizes the cluster resources is a GOOD thing 2. More Work which can not be done at the same time and has to be processed sequentially is a BAD thing So the key is whether it is about 1 or 2 and if it is about 1, whether it leads to e.g. Higher Throughput and Lower Latency or not Regards, Evo Eftimov *From:* Gerard Maas [mailto:gerard.m...@gmail.com] *Sent:* Thursday, April 16, 2015 10:41 AM *To:* Evo Eftimov *Cc:* Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie *Subject:* Re: How to do dispatching in Streaming? From experience, I'd recommend using the dstream.foreachRDD method and doing the filtering within that context. Extending the example of TD, something like this: dstream.foreachRDD { rdd = rdd.cache() messageType.foreach (msgTyp = val selection = rdd.filter(msgTyp.match(_)) selection.foreach { ... } } rdd.unpersist() } I would discourage the use of: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Because it will be a lot more work to process on the spark side. Each DSteam will schedule tasks for each partition, resulting in #dstream x #partitions x #stages tasks instead of the #partitions x #stages with the approach presented above. -kr, Gerard. On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote: And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Thursday, April 16, 2015 12:52 AM *To:* Jianshi Huang *Cc:* user; Shao, Saisai; Huang Jie *Subject:* Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
How to do dispatching in Streaming?
Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Add partition support in saveAsParquet
Hi, Anyone has similar request? https://issues.apache.org/jira/browse/SPARK-6561 When we save a DataFrame into Parquet files, we also want to have it partitioned. The proposed API looks like this: def saveAsParquet(path: String, partitionColumns: Seq[String]) -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Handling fatal errors of executors and decommission datanodes
I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, We're facing No space left on device errors lately from time to time. The job will fail after retries. Obvious in such case, retry won't be helpful. Sure it's the problem in the datanodes but I'm wondering if Spark Driver can handle it and decommission the problematic datanode before retrying it. And maybe dynamically allocate another datanode if dynamic allocation is enabled. I think there needs to be a class of fatal errors that can't be recovered with retries. And it's best Spark can handle it nicely. Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Handling fatal errors of executors and decommission datanodes
Thanks Shixiong! Very strange that our tasks were retried on the same executor again and again. I'll check spark.scheduler.executorTaskBlacklistTime. Jianshi On Mon, Mar 16, 2015 at 6:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: There are 2 cases for No space left on device: 1. Some tasks which use large temp space cannot run in any node. 2. The free space of datanodes is not balance. Some tasks which use large temp space can not run in several nodes, but they can run in other nodes successfully. Because most of our cases are the second one, we set spark.scheduler.executorTaskBlacklistTime to 3 to solve such No space left on device errors. So if a task runs unsuccessfully in some executor, it won't be scheduled to the same executor in 30 seconds. Best Regards, Shixiong Zhu 2015-03-16 17:40 GMT+08:00 Jianshi Huang jianshi.hu...@gmail.com: I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, We're facing No space left on device errors lately from time to time. The job will fail after retries. Obvious in such case, retry won't be helpful. Sure it's the problem in the datanodes but I'm wondering if Spark Driver can handle it and decommission the problematic datanode before retrying it. And maybe dynamically allocate another datanode if dynamic allocation is enabled. I think there needs to be a class of fatal errors that can't be recovered with retries. And it's best Spark can handle it nicely. Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Handling fatal errors of executors and decommission datanodes
Oh, by default it's set to 0L. I'll try setting it to 3 immediately. Thanks for the help! Jianshi On Mon, Mar 16, 2015 at 11:32 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Thanks Shixiong! Very strange that our tasks were retried on the same executor again and again. I'll check spark.scheduler.executorTaskBlacklistTime. Jianshi On Mon, Mar 16, 2015 at 6:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: There are 2 cases for No space left on device: 1. Some tasks which use large temp space cannot run in any node. 2. The free space of datanodes is not balance. Some tasks which use large temp space can not run in several nodes, but they can run in other nodes successfully. Because most of our cases are the second one, we set spark.scheduler.executorTaskBlacklistTime to 3 to solve such No space left on device errors. So if a task runs unsuccessfully in some executor, it won't be scheduled to the same executor in 30 seconds. Best Regards, Shixiong Zhu 2015-03-16 17:40 GMT+08:00 Jianshi Huang jianshi.hu...@gmail.com: I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, We're facing No space left on device errors lately from time to time. The job will fail after retries. Obvious in such case, retry won't be helpful. Sure it's the problem in the datanodes but I'm wondering if Spark Driver can handle it and decommission the problematic datanode before retrying it. And maybe dynamically allocate another datanode if dynamic allocation is enabled. I think there needs to be a class of fatal errors that can't be recovered with retries. And it's best Spark can handle it nicely. Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Forget about my last message. I was confused. Spark 1.2.1 + Scala 2.10.4 started by SBT console command also failed with this error. However running from a standard spark shell works. Jianshi On Fri, Mar 13, 2015 at 2:46 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... look like the console command still starts a Spark 1.3.0 with Scala 2.11.6 even I changed them in build.sbt. So the test with 1.2.1 is not valid. Jianshi On Fri, Mar 13, 2015 at 2:34 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I've confirmed it only failed in console started by SBT. I'm using sbt-spark-package plugin, and the initialCommands look like this (I added implicit sqlContext to it): show console::initialCommands [info] println(Welcome to\n + [info] __\n + [info] / __/__ ___ _/ /__\n + [info] _\\ \\/ _ \\/ _ `/ __/ '_/\n + [info]/___/ .__/\\_,_/_/ /_/\\_\\ version \1.3.0-rc2\\n + [info] /_/\n + [info] Using Scala \2.11.6\\n) [info] [info] import org.apache.spark.SparkContext._ [info] [info] val sc = { [info] val conf = new org.apache.spark.SparkConf() [info] .setMaster(local) [info] .setAppName(Sbt console + Spark!) [info] new org.apache.spark.SparkContext(conf) [info] } [info] println(Created spark context as sc.) [info] [info] def time[T](f: = T): T = { [info] import System.{currentTimeMillis = now} [info] val start = now [info] try { f } finally { println(Elapsed: + (now - start)/1000.0 + s) } [info] } [info] [info] @transient val sqlc = new org.apache.spark.sql.SQLContext(sc) [info] implicit def sqlContext = sqlc [info] import sqlc._ Jianshi On Fri, Mar 13, 2015 at 3:10 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: BTW, I was running tests from SBT when I get the errors. One test turn a Seq of case class to DataFrame. I also tried run similar code in the console, but failed with same error. I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4 Any idea? Jianshi On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Same issue here. But the classloader in my exception is somehow different. scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader with classpath Jianshi On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Hmm... look like the console command still starts a Spark 1.3.0 with Scala 2.11.6 even I changed them in build.sbt. So the test with 1.2.1 is not valid. Jianshi On Fri, Mar 13, 2015 at 2:34 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I've confirmed it only failed in console started by SBT. I'm using sbt-spark-package plugin, and the initialCommands look like this (I added implicit sqlContext to it): show console::initialCommands [info] println(Welcome to\n + [info] __\n + [info] / __/__ ___ _/ /__\n + [info] _\\ \\/ _ \\/ _ `/ __/ '_/\n + [info]/___/ .__/\\_,_/_/ /_/\\_\\ version \1.3.0-rc2\\n + [info] /_/\n + [info] Using Scala \2.11.6\\n) [info] [info] import org.apache.spark.SparkContext._ [info] [info] val sc = { [info] val conf = new org.apache.spark.SparkConf() [info] .setMaster(local) [info] .setAppName(Sbt console + Spark!) [info] new org.apache.spark.SparkContext(conf) [info] } [info] println(Created spark context as sc.) [info] [info] def time[T](f: = T): T = { [info] import System.{currentTimeMillis = now} [info] val start = now [info] try { f } finally { println(Elapsed: + (now - start)/1000.0 + s) } [info] } [info] [info] @transient val sqlc = new org.apache.spark.sql.SQLContext(sc) [info] implicit def sqlContext = sqlc [info] import sqlc._ Jianshi On Fri, Mar 13, 2015 at 3:10 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: BTW, I was running tests from SBT when I get the errors. One test turn a Seq of case class to DataFrame. I also tried run similar code in the console, but failed with same error. I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4 Any idea? Jianshi On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Same issue here. But the classloader in my exception is somehow different. scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader with classpath Jianshi On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
I've confirmed it only failed in console started by SBT. I'm using sbt-spark-package plugin, and the initialCommands look like this (I added implicit sqlContext to it): show console::initialCommands [info] println(Welcome to\n + [info] __\n + [info] / __/__ ___ _/ /__\n + [info] _\\ \\/ _ \\/ _ `/ __/ '_/\n + [info]/___/ .__/\\_,_/_/ /_/\\_\\ version \1.3.0-rc2\\n + [info] /_/\n + [info] Using Scala \2.11.6\\n) [info] [info] import org.apache.spark.SparkContext._ [info] [info] val sc = { [info] val conf = new org.apache.spark.SparkConf() [info] .setMaster(local) [info] .setAppName(Sbt console + Spark!) [info] new org.apache.spark.SparkContext(conf) [info] } [info] println(Created spark context as sc.) [info] [info] def time[T](f: = T): T = { [info] import System.{currentTimeMillis = now} [info] val start = now [info] try { f } finally { println(Elapsed: + (now - start)/1000.0 + s) } [info] } [info] [info] @transient val sqlc = new org.apache.spark.sql.SQLContext(sc) [info] implicit def sqlContext = sqlc [info] import sqlc._ Jianshi On Fri, Mar 13, 2015 at 3:10 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: BTW, I was running tests from SBT when I get the errors. One test turn a Seq of case class to DataFrame. I also tried run similar code in the console, but failed with same error. I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4 Any idea? Jianshi On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Same issue here. But the classloader in my exception is somehow different. scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader with classpath Jianshi On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Liancheng also found out that the Spark jars are not included in the classpath of URLClassLoader. Hmm... we're very close to the truth now. Jianshi On Fri, Mar 13, 2015 at 6:03 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I'm almost certain the problem is the ClassLoader. So adding fork := true solves problems for test and run. The problem is how can I fork a JVM for sbt console? fork in console := true seems not working... Jianshi On Fri, Mar 13, 2015 at 4:35 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I guess it's a ClassLoader issue. But I have no idea how to debug it. Any hints? Jianshi On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org wrote: i have the same issue running spark sql code from eclipse workspace. If you run your code from the command line (with a packaged jar) or from Intellij, I bet it should work. IMHO This is some how related to eclipse env, but would love to know how to fix it (whether via eclipse conf, or via a patch in spark). On 03/01/2015 02:32 AM, Michael Armbrust wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
I guess it's a ClassLoader issue. But I have no idea how to debug it. Any hints? Jianshi On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org wrote: i have the same issue running spark sql code from eclipse workspace. If you run your code from the command line (with a packaged jar) or from Intellij, I bet it should work. IMHO This is some how related to eclipse env, but would love to know how to fix it (whether via eclipse conf, or via a patch in spark). On 03/01/2015 02:32 AM, Michael Armbrust wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
I'm almost certain the problem is the ClassLoader. So adding fork := true solves problems for test and run. The problem is how can I fork a JVM for sbt console? fork in console := true seems not working... Jianshi On Fri, Mar 13, 2015 at 4:35 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I guess it's a ClassLoader issue. But I have no idea how to debug it. Any hints? Jianshi On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org wrote: i have the same issue running spark sql code from eclipse workspace. If you run your code from the command line (with a packaged jar) or from Intellij, I bet it should work. IMHO This is some how related to eclipse env, but would love to know how to fix it (whether via eclipse conf, or via a patch in spark). On 03/01/2015 02:32 AM, Michael Armbrust wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
BTW, I was running tests from SBT when I get the errors. One test turn a Seq of case class to DataFrame. I also tried run similar code in the console, but failed with same error. I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4 Any idea? Jianshi On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Same issue here. But the classloader in my exception is somehow different. scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader with classpath Jianshi On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Same issue here. But the classloader in my exception is somehow different. scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader with classpath Jianshi On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to set per-user spark.local.dir?
Unfortunately /tmp mount is really small in our environment. I need to provide a per-user setting as the default value. I hacked bin/spark-class for the similar effect. And spark-defaults.conf can override it. :) Jianshi On Wed, Mar 11, 2015 at 3:28 PM, Patrick Wendell pwend...@gmail.com wrote: We don't support expressions or wildcards in that configuration. For each application, the local directories need to be constant. If you have users submitting different Spark applications, those can each set spark.local.dirs. - Patrick On Wed, Mar 11, 2015 at 12:14 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I need to set per-user spark.local.dir, how can I do that? I tried both /x/home/${user.name}/spark/tmp and /x/home/${USER}/spark/tmp And neither worked. Looks like it has to be a constant setting in spark-defaults.conf. Right? Any ideas how to do that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to set per-user spark.local.dir?
Thanks Sean. I'll ask our Hadoop admin. Actually I didn't find hadoop.tmp.dir in the Hadoop settings...using user home is suggested by other users. Jianshi On Wed, Mar 11, 2015 at 3:51 PM, Sean Owen so...@cloudera.com wrote: You shouldn't use /tmp, but it doesn't mean you should use user home directories either. Typically, like in YARN, you would a number of directories (on different disks) mounted and configured for local storage for jobs. On Wed, Mar 11, 2015 at 7:42 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Unfortunately /tmp mount is really small in our environment. I need to provide a per-user setting as the default value. I hacked bin/spark-class for the similar effect. And spark-defaults.conf can override it. :) Jianshi On Wed, Mar 11, 2015 at 3:28 PM, Patrick Wendell pwend...@gmail.com wrote: We don't support expressions or wildcards in that configuration. For each application, the local directories need to be constant. If you have users submitting different Spark applications, those can each set spark.local.dirs. - Patrick On Wed, Mar 11, 2015 at 12:14 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I need to set per-user spark.local.dir, how can I do that? I tried both /x/home/${user.name}/spark/tmp and /x/home/${USER}/spark/tmp And neither worked. Looks like it has to be a constant setting in spark-defaults.conf. Right? Any ideas how to do that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
How to set per-user spark.local.dir?
Hi, I need to set per-user spark.local.dir, how can I do that? I tried both /x/home/${user.name}/spark/tmp and /x/home/${USER}/spark/tmp And neither worked. Looks like it has to be a constant setting in spark-defaults.conf. Right? Any ideas how to do that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Having lots of FetchFailedException in join
Thanks. I was about to submit a ticket for this :) Also there's a ticket for sort-merge based groupbykey https://issues.apache.org/jira/browse/SPARK-3461 BTW, any idea why run with netty didn't output OOM error messages? It's very confusing in troubleshooting. Jianshi On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai saisai.s...@intel.com wrote: I think there’s a lot of JIRA trying to solve this problem ( https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge join is a good choice. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:55 PM *To:* Shao, Saisai *Cc:* Cheng, Hao; user *Subject:* Re: Having lots of FetchFailedException in join There're some skew. 64 6164 0 SUCCESS PROCESS_LOCAL 200 / 2015/03/04 23:45:47 1.1 min 6 s 198.6 MB 21.1 GB 240.8 MB 59 6159 0 SUCCESS PROCESS_LOCAL 30 / 2015/03/04 23:45:47 44 s 5 s 200.7 MB 4.8 GB 154.0 MB But I expect this kind of skewness to be quite common. Jianshi On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32
Re: Having lots of FetchFailedException in join
One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext
Re: Having lots of FetchFailedException in join
There're some skew. 6461640SUCCESSPROCESS_LOCAL200 / 2015/03/04 23:45:471.1 min6 s198.6 MB21.1 GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / 2015/03/04 23:45:4744 s5 s200.7 MB4.8 GB154.0 MB But I expect this kind of skewness to be quite common. Jianshi On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator
Re: Having lots of FetchFailedException in join
Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597
Re: Having lots of FetchFailedException in join
I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31
Re: Having lots of FetchFailedException in join
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply
[no subject]
Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Having lots of FetchFailedException in join
Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0, mapId=24, reduceId=1220, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /:43070 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) Jianshi On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Having lots of FetchFailedException in join
Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.net.ConnectException: Connection refused: lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) And I checked executor on container host-, everything is good. Jianshi On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson ilike...@gmail.com wrote: Drat! That doesn't help. Could you scan from the top to see if there were any fatal errors preceding these? Sometimes a OOM will cause this type of issue further down. On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: The failed executor has the following error messages. Any hints? 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5711039715419258699
Re: Having lots of FetchFailedException in join
(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:724) Jianshi On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson ilike...@gmail.com wrote: Failed to connect implies that the executor at that host died, please check its logs as well. On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0, mapId=24, reduceId=1220, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /:43070 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) Jianshi On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Dynamic partition pattern support
Hi, HCatalog allows you to specify the pattern of paths for partitions, which will be used by dynamic partition loading. https://cwiki.apache.org/confluence/display/Hive/HCatalog+DynamicPartitions#HCatalogDynamicPartitions-ExternalTables Can we have similar feature in SparkSQL? Jira is here: https://issues.apache.org/jira/browse/SPARK-5828 Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Which version to use for shuffle service if I'm going to run multiple versions of Spark
Get it. Thanks Reynold and Andrew! Jianshi On Thu, Feb 12, 2015 at 12:25 AM, Andrew Or and...@databricks.com wrote: Hi Jianshi, For YARN, there may be an issue with how a recently patch changes the accessibility of the shuffle files by the external shuffle service: https://issues.apache.org/jira/browse/SPARK-5655. It is likely that you will hit this with 1.2.1, actually. For this reason I would have to recommend that you use 1.2.2 when it is released, but for now you should use 1.2.0 for this specific use case. -Andrew 2015-02-10 23:38 GMT-08:00 Reynold Xin r...@databricks.com: I think we made the binary protocol compatible across all versions, so you should be fine with using any one of them. 1.2.1 is probably the best since it is the most recent stable release. On Tue, Feb 10, 2015 at 8:43 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I need to use branch-1.2 and sometimes master builds of Spark for my project. However the officially supported Spark version by our Hadoop admin is only 1.2.0. So, my question is which version/build of spark-yarn-shuffle.jar should I use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Which version to use for shuffle service if I'm going to run multiple versions of Spark
Hi, I need to use branch-1.2 and sometimes master builds of Spark for my project. However the officially supported Spark version by our Hadoop admin is only 1.2.0. So, my question is which version/build of spark-yarn-shuffle.jar should I use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Pig loader in Spark
Hi, Anyone has implemented the default Pig Loader in Spark? (loading delimited text files with .pig_schema) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Hive UDAF percentile_approx says This UDAF does not support the deprecated getEvaluator() method.
Hi, The following SQL query select percentile_approx(variables.var1, 0.95) p95 from model will throw ERROR SparkSqlInterpreter: Error org.apache.hadoop.hive.ql.parse.SemanticException: This UDAF does not support the deprecated getEvaluator() method. at org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver.getEvaluator(AbstractGenericUDAFResolver.java:53) at org.apache.spark.sql.hive.HiveGenericUdaf.objectInspector$lzycompute(hiveUdfs.scala:196) at org.apache.spark.sql.hive.HiveGenericUdaf.objectInspector(hiveUdfs.scala:195) at org.apache.spark.sql.hive.HiveGenericUdaf.dataType(hiveUdfs.scala:203) at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:105) at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:143) at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:143) I'm using latest branch-1.2 I found in PR that percentile and percentile_approx are supported. A bug? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Hive UDAF percentile_approx says This UDAF does not support the deprecated getEvaluator() method.
Ah, thx Ted and Yin! I'll build a new version. :) Jianshi On Wed, Jan 14, 2015 at 7:24 AM, Yin Huai yh...@databricks.com wrote: Yeah, it's a bug. It has been fixed by https://issues.apache.org/jira/browse/SPARK-3891 in master. On Tue, Jan 13, 2015 at 2:41 PM, Ted Yu yuzhih...@gmail.com wrote: Looking at the source code for AbstractGenericUDAFResolver, the following (non-deprecated) method should be called: public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) It is called by hiveUdfs.scala (master branch): val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors.toArray, false, false) resolver.getEvaluator(parameterInfo) FYI On Tue, Jan 13, 2015 at 1:51 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, The following SQL query select percentile_approx(variables.var1, 0.95) p95 from model will throw ERROR SparkSqlInterpreter: Error org.apache.hadoop.hive.ql.parse.SemanticException: This UDAF does not support the deprecated getEvaluator() method. at org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver.getEvaluator(AbstractGenericUDAFResolver.java:53) at org.apache.spark.sql.hive.HiveGenericUdaf.objectInspector$lzycompute(hiveUdfs.scala:196) at org.apache.spark.sql.hive.HiveGenericUdaf.objectInspector(hiveUdfs.scala:195) at org.apache.spark.sql.hive.HiveGenericUdaf.dataType(hiveUdfs.scala:203) at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:105) at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:143) at org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:143) I'm using latest branch-1.2 I found in PR that percentile and percentile_approx are supported. A bug? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
FYI, Latest hive 0.14/parquet will have column renaming support. Jianshi On Wed, Dec 10, 2014 at 3:37 AM, Michael Armbrust mich...@databricks.com wrote: You might also try out the recently added support for views. On Mon, Dec 8, 2014 at 9:31 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ah... I see. Thanks for pointing it out. Then it means we cannot mount external table using customized column names. hmm... Then the only option left is to use a subquery to add a bunch of column alias. I'll try it later. Thanks, Jianshi On Tue, Dec 9, 2014 at 3:34 AM, Michael Armbrust mich...@databricks.com wrote: This is by hive's design. From the Hive documentation: The column change command will only modify Hive's metadata, and will not modify data. Users should make sure the actual data layout of the table/partition conforms with the metadata definition. On Sat, Dec 6, 2014 at 8:28 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, found another possible bug in Hive. My current solution is to use ALTER TABLE CHANGE to rename the column names. The problem is after renaming the column names, the value of the columns became all NULL. Before renaming: scala sql(select `sorted::cre_ts` from pmt limit 1).collect res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54]) Execute renaming: scala sql(alter table pmt change `sorted::cre_ts` cre_ts string) res13: org.apache.spark.sql.SchemaRDD = SchemaRDD[972] at RDD at SchemaRDD.scala:108 == Query Plan == Native command: executed by Hive After renaming: scala sql(select cre_ts from pmt limit 1).collect res16: Array[org.apache.spark.sql.Row] = Array([null]) I created a JIRA for it: https://issues.apache.org/jira/browse/SPARK-4781 Jianshi On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... another issue I found doing this approach is that ANALYZE TABLE ... COMPUTE STATISTICS will fail to attach the metadata to the table, and later broadcast join and such will fail... Any idea how to fix this issue? Jianshi On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Very interesting, the line doing drop table will throws an exception. After removing it all works. Jianshi On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Here's the solution I got after talking with Liancheng: 1) using backquote `..` to wrap up all illegal characters val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl_13 = s |CREATE EXTERNAL TABLE $name ( | $schema |) |STORED AS PARQUET |LOCATION '$file' .stripMargin sql(ddl_13) 2) create a new Schema and do applySchema to generate a new SchemaRDD, had to drop and register table val t = table(name) val newSchema = StructType(t.schema.fields.map(s = s.copy(name = s.name.replaceAll(.*?::, sql(sdrop table $name) applySchema(t, newSchema).registerTempTable(name) I'm testing it for now. Thanks for the help! Jianshi On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Ah... I see. Thanks for pointing it out. Then it means we cannot mount external table using customized column names. hmm... Then the only option left is to use a subquery to add a bunch of column alias. I'll try it later. Thanks, Jianshi On Tue, Dec 9, 2014 at 3:34 AM, Michael Armbrust mich...@databricks.com wrote: This is by hive's design. From the Hive documentation: The column change command will only modify Hive's metadata, and will not modify data. Users should make sure the actual data layout of the table/partition conforms with the metadata definition. On Sat, Dec 6, 2014 at 8:28 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, found another possible bug in Hive. My current solution is to use ALTER TABLE CHANGE to rename the column names. The problem is after renaming the column names, the value of the columns became all NULL. Before renaming: scala sql(select `sorted::cre_ts` from pmt limit 1).collect res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54]) Execute renaming: scala sql(alter table pmt change `sorted::cre_ts` cre_ts string) res13: org.apache.spark.sql.SchemaRDD = SchemaRDD[972] at RDD at SchemaRDD.scala:108 == Query Plan == Native command: executed by Hive After renaming: scala sql(select cre_ts from pmt limit 1).collect res16: Array[org.apache.spark.sql.Row] = Array([null]) I created a JIRA for it: https://issues.apache.org/jira/browse/SPARK-4781 Jianshi On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... another issue I found doing this approach is that ANALYZE TABLE ... COMPUTE STATISTICS will fail to attach the metadata to the table, and later broadcast join and such will fail... Any idea how to fix this issue? Jianshi On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Very interesting, the line doing drop table will throws an exception. After removing it all works. Jianshi On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Here's the solution I got after talking with Liancheng: 1) using backquote `..` to wrap up all illegal characters val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl_13 = s |CREATE EXTERNAL TABLE $name ( | $schema |) |STORED AS PARQUET |LOCATION '$file' .stripMargin sql(ddl_13) 2) create a new Schema and do applySchema to generate a new SchemaRDD, had to drop and register table val t = table(name) val newSchema = StructType(t.schema.fields.map(s = s.copy(name = s.name.replaceAll(.*?::, sql(sdrop table $name) applySchema(t, newSchema).registerTempTable(name) I'm testing it for now. Thanks for the help! Jianshi On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Very interesting, the line doing drop table will throws an exception. After removing it all works. Jianshi On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Here's the solution I got after talking with Liancheng: 1) using backquote `..` to wrap up all illegal characters val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl_13 = s |CREATE EXTERNAL TABLE $name ( | $schema |) |STORED AS PARQUET |LOCATION '$file' .stripMargin sql(ddl_13) 2) create a new Schema and do applySchema to generate a new SchemaRDD, had to drop and register table val t = table(name) val newSchema = StructType(t.schema.fields.map(s = s.copy(name = s.name.replaceAll(.*?::, sql(sdrop table $name) applySchema(t, newSchema).registerTempTable(name) I'm testing it for now. Thanks for the help! Jianshi On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Hmm... another issue I found doing this approach is that ANALYZE TABLE ... COMPUTE STATISTICS will fail to attach the metadata to the table, and later broadcast join and such will fail... Any idea how to fix this issue? Jianshi On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Very interesting, the line doing drop table will throws an exception. After removing it all works. Jianshi On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Here's the solution I got after talking with Liancheng: 1) using backquote `..` to wrap up all illegal characters val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl_13 = s |CREATE EXTERNAL TABLE $name ( | $schema |) |STORED AS PARQUET |LOCATION '$file' .stripMargin sql(ddl_13) 2) create a new Schema and do applySchema to generate a new SchemaRDD, had to drop and register table val t = table(name) val newSchema = StructType(t.schema.fields.map(s = s.copy(name = s.name.replaceAll(.*?::, sql(sdrop table $name) applySchema(t, newSchema).registerTempTable(name) I'm testing it for now. Thanks for the help! Jianshi On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Ok, found another possible bug in Hive. My current solution is to use ALTER TABLE CHANGE to rename the column names. The problem is after renaming the column names, the value of the columns became all NULL. Before renaming: scala sql(select `sorted::cre_ts` from pmt limit 1).collect res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54]) Execute renaming: scala sql(alter table pmt change `sorted::cre_ts` cre_ts string) res13: org.apache.spark.sql.SchemaRDD = SchemaRDD[972] at RDD at SchemaRDD.scala:108 == Query Plan == Native command: executed by Hive After renaming: scala sql(select cre_ts from pmt limit 1).collect res16: Array[org.apache.spark.sql.Row] = Array([null]) I created a JIRA for it: https://issues.apache.org/jira/browse/SPARK-4781 Jianshi On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... another issue I found doing this approach is that ANALYZE TABLE ... COMPUTE STATISTICS will fail to attach the metadata to the table, and later broadcast join and such will fail... Any idea how to fix this issue? Jianshi On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Very interesting, the line doing drop table will throws an exception. After removing it all works. Jianshi On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Here's the solution I got after talking with Liancheng: 1) using backquote `..` to wrap up all illegal characters val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl_13 = s |CREATE EXTERNAL TABLE $name ( | $schema |) |STORED AS PARQUET |LOCATION '$file' .stripMargin sql(ddl_13) 2) create a new Schema and do applySchema to generate a new SchemaRDD, had to drop and register table val t = table(name) val newSchema = StructType(t.schema.fields.map(s = s.copy(name = s.name.replaceAll(.*?::, sql(sdrop table $name) applySchema(t, newSchema).registerTempTable(name) I'm testing it for now. Thanks for the help! Jianshi On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Convert RDD[Map[String, Any]] to SchemaRDD
Hmm.. I've created a JIRA: https://issues.apache.org/jira/browse/SPARK-4782 Jianshi On Sun, Dec 7, 2014 at 2:32 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, What's the best way to convert RDD[Map[String, Any]] to a SchemaRDD? I'm currently converting each Map to a JSON String and do JsonRDD.inferSchema. How about adding inferSchema support to Map[String, Any] directly? It would be very useful. Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: drop table if exists throws exception
I see. The resulting SchemaRDD is returned so like Michael said, the exception does not propogate to user code. However printing out the following log is confusing :) scala sql(drop table if exists abc) 14/12/05 16:27:02 INFO ParseDriver: Parsing command: drop table if exists abc 14/12/05 16:27:02 INFO ParseDriver: Parse Completed 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO Driver: Concurrency mode is disabled, not creating a lock manager 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO ParseDriver: Parsing command: DROP TABLE IF EXISTS abc 14/12/05 16:27:02 INFO ParseDriver: Parse Completed 14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=parse start=1417825622650 end=1417825622650 duration=0 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc 14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr cmd=get_table : db=default tbl=abc 14/12/05 16:27:02 INFO Driver: Semantic Analysis Completed 14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=semanticAnalyze start=1417825622650 end=1417825622653 duration=3 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=compile start=1417825622650 end=1417825622654 duration=4 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=Driver.execute from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO Driver: Starting command: DROP TABLE IF EXISTS abc 14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=TimeToSubmit start=1417825622650 end=1417825622654 duration=4 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=runTasks from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=task.DDL.Stage-0 from=org.apache.hadoop.hive.ql.Driver 14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc 14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr cmd=get_table : db=default tbl=abc 14/12/05 16:27:02 ERROR Hive: NoSuchObjectException(message:default.abc table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560) at sun.reflect.GeneratedMethodAccessor57.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105) On Sat, Dec 6, 2014 at 3:45 AM, Mark Hamstra m...@clearstorydata.com wrote: And that is no different from how Hive has worked for a long time. On Fri, Dec 5, 2014 at 11:42 AM, Michael Armbrust mich...@databricks.com wrote: The command run fine for me on master. Note that Hive does print an exception in the logs, but that exception does not propogate to user code. On Thu, Dec 4, 2014 at 11:31 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got exception saying Hive: NoSuchObjectException(message:table table not found) when running DROP TABLE IF EXISTS table Looks like a new regression in Hive module. Anyone can confirm this? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)
Here's the solution I got after talking with Liancheng: 1) using backquote `..` to wrap up all illegal characters val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s`${f.name}` ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl_13 = s |CREATE EXTERNAL TABLE $name ( | $schema |) |STORED AS PARQUET |LOCATION '$file' .stripMargin sql(ddl_13) 2) create a new Schema and do applySchema to generate a new SchemaRDD, had to drop and register table val t = table(name) val newSchema = StructType(t.schema.fields.map(s = s.copy(name = s.name.replaceAll(.*?::, sql(sdrop table $name) applySchema(t, newSchema).registerTempTable(name) I'm testing it for now. Thanks for the help! Jianshi On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I had to use Pig for some preprocessing and to generate Parquet files for Spark to consume. However, due to Pig's limitation, the generated schema contains Pig's identifier e.g. sorted::id, sorted::cre_ts, ... I tried to put the schema inside CREATE EXTERNAL TABLE, e.g. create external table pmt ( sorted::id bigint ) stored as parquet location '...' Obviously it didn't work, I also tried removing the identifier sorted::, but the resulting rows contain only nulls. Any idea how to create a table in HiveContext from these Parquet files? Thanks, Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Exception adding resource files in latest Spark
I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
Actually my HADOOP_CLASSPATH has already been set to include /etc/hadoop/conf/* export HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase classpath) Jianshi On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
Looks like the datanucleus*.jar shouldn't appear in the hdfs path in Yarn-client mode. Maybe this patch broke yarn-client. https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 Jianshi On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Actually my HADOOP_CLASSPATH has already been set to include /etc/hadoop/conf/* export HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase classpath) Jianshi On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Exception adding resource files in latest Spark
Correction: According to Liancheng, this hotfix might be the root cause: https://github.com/apache/spark/commit/38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce Jianshi On Fri, Dec 5, 2014 at 12:45 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like the datanucleus*.jar shouldn't appear in the hdfs path in Yarn-client mode. Maybe this patch broke yarn-client. https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53 Jianshi On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Actually my HADOOP_CLASSPATH has already been set to include /etc/hadoop/conf/* export HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase classpath) Jianshi On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf I've already set the following env variables: export YARN_CONF_DIR=/etc/hadoop/conf export HADOOP_CONF_DIR=/etc/hadoop/conf export HBASE_CONF_DIR=/etc/hbase/conf Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH? Jianshi On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I got the following error during Spark startup (Yarn-client mode): 14/12/04 19:33:58 INFO Client: Uploading resource file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar - hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar java.lang.IllegalArgumentException: Wrong FS: hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257) at org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35) at org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140) at org.apache.spark.SparkContext.init(SparkContext.scala:335) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) I'm using latest Spark built from master HEAD yesterday. Is this a bug? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Auto BroadcastJoin optimization failed in latest Spark
Sorry for the late of follow-up. I used Hao's DESC EXTENDED command and found some clue: new (broadcast broken Spark build): parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} old (broadcast working Spark build): parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, totalSize=56166} Looks like the table size computation failed in the latest version. I've run the analyze command: ANALYZE TABLE $table COMPUTE STATISTICS noscan And the tables are created from Parquet files: e.g. CREATE EXTERNAL TABLE table1 ( code int, desc string ) STORED AS PARQUET LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' Anyone knows what went wrong? Thanks, Jianshi On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote: Hi Jianshi, I couldn’t reproduce that with latest MASTER, and I can always get the BroadcastHashJoin for managed tables (in .csv file) in my testing, are there any external tables in your case? In general probably couple of things you can try first (with HiveContext): 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all of the tables); 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold as a greater value, it is 1024*1024*10 by default, just make sure the maximum dimension tables size (in bytes) is less than this) 3) Always put the main table(the biggest table) in the left-most among the inner joins; DESC EXTENDED tablename; -- this will print the detail information for the statistic table size (the field “totalSize”) EXPLAIN EXTENDED query; -- this will print the detail physical plan. Let me know if you still have problem. Hao *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, November 27, 2014 10:24 PM *To:* Cheng, Hao *Cc:* user *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark Hi Hao, I'm using inner join as Broadcast join didn't work for left joins (thanks for the links for the latest improvements). And I'm using HiveConext and it worked in a previous build (10/12) when joining 15 dimension tables. Jianshi On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote: Are all of your join keys the same? and I guess the join type are all “Left” join, https://github.com/apache/spark/pull/3362 probably is what you need. And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast join) currently, https://github.com/apache/spark/pull/3270 should be another optimization for this. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Wednesday, November 26, 2014 4:36 PM *To:* user *Subject:* Auto BroadcastJoin optimization failed in latest Spark Hi, I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails optimizing auto broadcast join in my query. I have a query that joins a huge fact table with 15 tiny dimension tables. I'm currently using an older version of Spark which was built on Oct. 12. Anyone else has met similar situation? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Auto BroadcastJoin optimization failed in latest Spark
If I run ANALYZE without NOSCAN, then Hive can successfully get the size: parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589, COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296} Is Hive's PARQUET support broken? Jianshi On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry for the late of follow-up. I used Hao's DESC EXTENDED command and found some clue: new (broadcast broken Spark build): parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} old (broadcast working Spark build): parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, totalSize=56166} Looks like the table size computation failed in the latest version. I've run the analyze command: ANALYZE TABLE $table COMPUTE STATISTICS noscan And the tables are created from Parquet files: e.g. CREATE EXTERNAL TABLE table1 ( code int, desc string ) STORED AS PARQUET LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' Anyone knows what went wrong? Thanks, Jianshi On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote: Hi Jianshi, I couldn’t reproduce that with latest MASTER, and I can always get the BroadcastHashJoin for managed tables (in .csv file) in my testing, are there any external tables in your case? In general probably couple of things you can try first (with HiveContext): 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all of the tables); 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold as a greater value, it is 1024*1024*10 by default, just make sure the maximum dimension tables size (in bytes) is less than this) 3) Always put the main table(the biggest table) in the left-most among the inner joins; DESC EXTENDED tablename; -- this will print the detail information for the statistic table size (the field “totalSize”) EXPLAIN EXTENDED query; -- this will print the detail physical plan. Let me know if you still have problem. Hao *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, November 27, 2014 10:24 PM *To:* Cheng, Hao *Cc:* user *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark Hi Hao, I'm using inner join as Broadcast join didn't work for left joins (thanks for the links for the latest improvements). And I'm using HiveConext and it worked in a previous build (10/12) when joining 15 dimension tables. Jianshi On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote: Are all of your join keys the same? and I guess the join type are all “Left” join, https://github.com/apache/spark/pull/3362 probably is what you need. And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast join) currently, https://github.com/apache/spark/pull/3270 should be another optimization for this. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Wednesday, November 26, 2014 4:36 PM *To:* user *Subject:* Auto BroadcastJoin optimization failed in latest Spark Hi, I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails optimizing auto broadcast join in my query. I have a query that joins a huge fact table with 15 tiny dimension tables. I'm currently using an older version of Spark which was built on Oct. 12. Anyone else has met similar situation? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Auto BroadcastJoin optimization failed in latest Spark
With Liancheng's suggestion, I've tried setting spark.sql.hive.convertMetastoreParquet false but still analyze noscan return -1 in rawDataSize Jianshi On Fri, Dec 5, 2014 at 3:33 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: If I run ANALYZE without NOSCAN, then Hive can successfully get the size: parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589, COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296} Is Hive's PARQUET support broken? Jianshi On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry for the late of follow-up. I used Hao's DESC EXTENDED command and found some clue: new (broadcast broken Spark build): parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892, COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1} old (broadcast working Spark build): parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591, totalSize=56166} Looks like the table size computation failed in the latest version. I've run the analyze command: ANALYZE TABLE $table COMPUTE STATISTICS noscan And the tables are created from Parquet files: e.g. CREATE EXTERNAL TABLE table1 ( code int, desc string ) STORED AS PARQUET LOCATION '/user/jianshuang/data/dim_tables/table1.parquet' Anyone knows what went wrong? Thanks, Jianshi On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote: Hi Jianshi, I couldn’t reproduce that with latest MASTER, and I can always get the BroadcastHashJoin for managed tables (in .csv file) in my testing, are there any external tables in your case? In general probably couple of things you can try first (with HiveContext): 1) ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all of the tables); 2) SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold as a greater value, it is 1024*1024*10 by default, just make sure the maximum dimension tables size (in bytes) is less than this) 3) Always put the main table(the biggest table) in the left-most among the inner joins; DESC EXTENDED tablename; -- this will print the detail information for the statistic table size (the field “totalSize”) EXPLAIN EXTENDED query; -- this will print the detail physical plan. Let me know if you still have problem. Hao *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, November 27, 2014 10:24 PM *To:* Cheng, Hao *Cc:* user *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark Hi Hao, I'm using inner join as Broadcast join didn't work for left joins (thanks for the links for the latest improvements). And I'm using HiveConext and it worked in a previous build (10/12) when joining 15 dimension tables. Jianshi On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote: Are all of your join keys the same? and I guess the join type are all “Left” join, https://github.com/apache/spark/pull/3362 probably is what you need. And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast join) currently, https://github.com/apache/spark/pull/3270 should be another optimization for this. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Wednesday, November 26, 2014 4:36 PM *To:* user *Subject:* Auto BroadcastJoin optimization failed in latest Spark Hi, I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails optimizing auto broadcast join in my query. I have a query that joins a huge fact table with 15 tiny dimension tables. I'm currently using an older version of Spark which was built on Oct. 12. Anyone else has met similar situation? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Auto BroadcastJoin optimization failed in latest Spark
Hi Hao, I'm using inner join as Broadcast join didn't work for left joins (thanks for the links for the latest improvements). And I'm using HiveConext and it worked in a previous build (10/12) when joining 15 dimension tables. Jianshi On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote: Are all of your join keys the same? and I guess the join type are all “Left” join, https://github.com/apache/spark/pull/3362 probably is what you need. And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast join) currently, https://github.com/apache/spark/pull/3270 should be another optimization for this. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Wednesday, November 26, 2014 4:36 PM *To:* user *Subject:* Auto BroadcastJoin optimization failed in latest Spark Hi, I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails optimizing auto broadcast join in my query. I have a query that joins a huge fact table with 15 tiny dimension tables. I'm currently using an older version of Spark which was built on Oct. 12. Anyone else has met similar situation? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Auto BroadcastJoin optimization failed in latest Spark
Hi, I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails optimizing auto broadcast join in my query. I have a query that joins a huge fact table with 15 tiny dimension tables. I'm currently using an older version of Spark which was built on Oct. 12. Anyone else has met similar situation? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to do broadcast join in SparkSQL
Hi, Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support. I got the following exceptions: org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat at org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) Using the same DDL and Analyze script above. Jianshi On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: It works fine, thanks for the help Michael. Liancheng also told me a trick, using a subquery with LIMIT n. It works in latest 1.2.0 BTW, looks like the broadcast optimization won't be recognized if I do a left join instead of a inner join. Is that true? How can I make it work for left joins? Cheers, Jianshi On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust mich...@databricks.com wrote: Thanks for the input. We purposefully made sure that the config option did not make it into a release as it is not something that we are willing to support long term. That said we'll try and make this easier in the future either through hints or better support for statistics. In this particular case you can get what you want by registering the tables as external tables and setting an flag. Here's a helper function to do what you need. /** * Sugar for creating a Hive external table from a parquet path. */ def createParquetTable(name: String, file: String): Unit = { import org.apache.spark.sql.hive.HiveMetastoreTypes val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s${f.name} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl = s |CREATE EXTERNAL TABLE $name ( | $schema |) |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' |LOCATION '$file'.stripMargin sql(ddl) setConf(spark.sql.hive.convertMetastoreParquet, true) } You'll also need to run this to populate the statistics: ANALYZE TABLE tableName COMPUTE STATISTICS noscan; On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, currently there's cost-based optimization however Parquet statistics is not implemented... What's the good way if I want to join a big fact table with several tiny dimension tables in Spark SQL (1.1)? I wish we can allow user hint for the join. Jianshi On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged into master? I cannot find spark.sql.hints.broadcastTables in latest master, but it's in the following patch. https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 Jianshi On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Yes, looks like it can only be controlled by the parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird to me. How am I suppose to know the exact bytes of a table? Let me specify the join algorithm is preferred I think. Jianshi On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at SPARK-1800 ? e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala Cheers On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I cannot find it in the documentation. And I have a dozen dimension tables to (left) join... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to do broadcast join in SparkSQL
Oh, I found a explanation from http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/ The error here is a bit misleading, what it really means is that the class parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive. Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as parquet-hive-1.0.jar Is it removed from latest Spark? Jianshi On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support. I got the following exceptions: org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must implement HiveOutputFormat, otherwise it should be either IgnoreKeyTextOutputFormat or SequenceFileOutputFormat at org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327) Using the same DDL and Analyze script above. Jianshi On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: It works fine, thanks for the help Michael. Liancheng also told me a trick, using a subquery with LIMIT n. It works in latest 1.2.0 BTW, looks like the broadcast optimization won't be recognized if I do a left join instead of a inner join. Is that true? How can I make it work for left joins? Cheers, Jianshi On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust mich...@databricks.com wrote: Thanks for the input. We purposefully made sure that the config option did not make it into a release as it is not something that we are willing to support long term. That said we'll try and make this easier in the future either through hints or better support for statistics. In this particular case you can get what you want by registering the tables as external tables and setting an flag. Here's a helper function to do what you need. /** * Sugar for creating a Hive external table from a parquet path. */ def createParquetTable(name: String, file: String): Unit = { import org.apache.spark.sql.hive.HiveMetastoreTypes val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s${f.name} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl = s |CREATE EXTERNAL TABLE $name ( | $schema |) |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' |LOCATION '$file'.stripMargin sql(ddl) setConf(spark.sql.hive.convertMetastoreParquet, true) } You'll also need to run this to populate the statistics: ANALYZE TABLE tableName COMPUTE STATISTICS noscan; On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, currently there's cost-based optimization however Parquet statistics is not implemented... What's the good way if I want to join a big fact table with several tiny dimension tables in Spark SQL (1.1)? I wish we can allow user hint for the join. Jianshi On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not merged into master? I cannot find spark.sql.hints.broadcastTables in latest master, but it's in the following patch. https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5 Jianshi On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Yes, looks like it can only be controlled by the parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit weird to me. How am I suppose to know the exact bytes of a table? Let me specify the join algorithm is preferred I think. Jianshi On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at SPARK-1800 ? e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala Cheers On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: I cannot find it in the documentation. And I have a dozen dimension tables to (left) join... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter
How to deal with BigInt in my case class for RDD = SchemaRDD convertion
Hi, I got an error during rdd.registerTempTable(...) saying scala.MatchError: scala.BigInt Looks like BigInt cannot be used in SchemaRDD, is that correct? So what would you recommend to deal with it? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to deal with BigInt in my case class for RDD = SchemaRDD convertion
Ah yes. I found it too in the manual. Thanks for the help anyway! Since BigDecimal is just a wrapper around BigInt, let's also convert to BigInt to Decimal. I created a ticket. https://issues.apache.org/jira/browse/SPARK-4549 Jianshi On Fri, Nov 21, 2014 at 11:30 PM, Yin Huai huaiyin@gmail.com wrote: Hello Jianshi, The reason of that error is that we do not have a Spark SQL data type for Scala BigInt. You can use Decimal for your case. Thanks, Yin On Fri, Nov 21, 2014 at 5:11 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got an error during rdd.registerTempTable(...) saying scala.MatchError: scala.BigInt Looks like BigInt cannot be used in SchemaRDD, is that correct? So what would you recommend to deal with it? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Is it safe to use Scala 2.11 for Spark build?
$1.apply(Ivy.scala:147) at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:124) at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56) at sbt.IvySbt$$anon$3.call(Ivy.scala:64) at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93) at xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78) at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97) at xsbt.boot.Using$.withResource(Using.scala:10) at xsbt.boot.Using$.apply(Using.scala:9) at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58) at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48) commit c6e0c2ab1c29c184a9302d23ad75e4ccd8060242 at xsbt.boot.Locks$.apply0(Locks.scala:31) at xsbt.boot.Locks$.apply(Locks.scala:28) at sbt.IvySbt.withDefaultLogger(Ivy.scala:64) at sbt.IvySbt.withIvy(Ivy.scala:119) at sbt.IvySbt.withIvy(Ivy.scala:116) at sbt.IvySbt$Module.withModule(Ivy.scala:147) at sbt.IvyActions$.updateEither(IvyActions.scala:156) at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1282) at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1279) at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$84.apply(Defaults.scala:1309) at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$84.apply(Defaults.scala:1307) at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35) at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1312) at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1306) at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45) at sbt.Classpaths$.cachedUpdate(Defaults.scala:1324) at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1264) at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1242) at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40) at sbt.std.Transform$$anon$4.work(System.scala:63) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226) at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17) at sbt.Execute.work(Execute.scala:235) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226) at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159) at sbt.CompletionService$$anon$2.call(CompletionService.scala:28) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) [error] (streaming-kafka/*:update) sbt.ResolveException: unresolved dependency: org.apache.kafka#kafka_2.11;0.8.0: not found [error] (catalyst/*:update) sbt.ResolveException: unresolved dependency: org.scalamacros#quasiquotes_2.11;2.0.1: not found -- Ye Xianjin Sent with Sparrow http://www.sparrowmailapp.com/?sig On Tuesday, November 18, 2014 at 3:27 PM, Prashant Sharma wrote: It is safe in the sense we would help you with the fix if you run into issues. I have used it, but since I worked on the patch the opinion can be biased. I am using scala 2.11 for day to day development. You should checkout the build instructions here : https://github.com/ScrapCodes/spark-1/blob/patch-3/docs/building-spark.md Prashant Sharma On Tue, Nov 18, 2014 at 12:19 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Any notable issues for using Scala 2.11? Is it stable now? Or can I use Scala 2.11 in my spark application and use Spark dist build with 2.10 ? I'm looking forward to migrate to 2.11 for some quasiquote features. Couldn't make it run in 2.10... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Is there setup and cleanup function in spark?
I see. Agree that lazy eval is not suitable for proper setup and teardown. We also abandoned it due to inherent incompatibility between implicit and lazy. It was fun to come up this trick though. Jianshi On Tue, Nov 18, 2014 at 10:28 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Nov 14, 2014 at 2:49 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Ok, then we need another trick. let's have an *implicit lazy var connection/context* around our code. And setup() will trigger the eval and initialization. Due to lazy evaluation, I think having setup/teardown is a bit tricky. In particular teardown, because it is not easy to execute code after all computation is done. You can check http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p11009.html for an example of what worked for me. Tobias -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Is it safe to use Scala 2.11 for Spark build?
Any notable issues for using Scala 2.11? Is it stable now? Or can I use Scala 2.11 in my spark application and use Spark dist build with 2.10 ? I'm looking forward to migrate to 2.11 for some quasiquote features. Couldn't make it run in 2.10... Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Compiling Spark master HEAD failed.
The mvn build command is mvn clean install -Pyarn -Phive -Phive-0.13.1 -Phadoop-2.4 -Djava.version=1.7 -DskipTests I'm getting this error message: [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on project spark-hive_2.10: wrap: scala.r eflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found. - [Help 1] Anyone knows what's the problem? I'm building it on OSX. I didn't had this problem one month ago. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Is there setup and cleanup function in spark?
So can I write it like this? rdd.mapPartition(i = setup(); i).map(...).mapPartition(i = cleanup(); i) So I don't need to mess up the logic and still can use map, filter and other transformations for RDD. Jianshi On Fri, Nov 14, 2014 at 12:20 PM, Cheng Lian lian.cs@gmail.com wrote: If you’re looking for executor side setup and cleanup functions, there ain’t any yet, but you can achieve the same semantics via RDD.mapPartitions. Please check the “setup() and cleanup” section of this blog from Cloudera for details: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ On 11/14/14 10:44 AM, Dai, Kevin wrote: HI, all Is there setup and cleanup function as in hadoop mapreduce in spark which does some initialization and cleanup work? Best Regards, Kevin. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Is there setup and cleanup function in spark?
Ok, then we need another trick. let's have an *implicit lazy var connection/context* around our code. And setup() will trigger the eval and initialization. The implicit lazy val/var trick is actually invented by Kevin. :) Jianshi On Fri, Nov 14, 2014 at 1:41 PM, Cheng Lian lian.cs@gmail.com wrote: If you’re just relying on the side effect of setup() and cleanup() then I think this trick is OK and pretty cleaner. But if setup() returns, say, a DB connection, then the map(...) part and cleanup() can’t get the connection object. On 11/14/14 1:20 PM, Jianshi Huang wrote: So can I write it like this? rdd.mapPartition(i = setup(); i).map(...).mapPartition(i = cleanup(); i) So I don't need to mess up the logic and still can use map, filter and other transformations for RDD. Jianshi On Fri, Nov 14, 2014 at 12:20 PM, Cheng Lian lian.cs@gmail.com wrote: If you’re looking for executor side setup and cleanup functions, there ain’t any yet, but you can achieve the same semantics via RDD.mapPartitions. Please check the “setup() and cleanup” section of this blog from Cloudera for details: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ On 11/14/14 10:44 AM, Dai, Kevin wrote: HI, all Is there setup and cleanup function as in hadoop mapreduce in spark which does some initialization and cleanup work? Best Regards, Kevin. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: RDD to DStream
I also discussed with Liancheng two weeks ago. And he suggested to use toLocalIterator to collect partitions of RDD to driver (same order if RDD is sorted), and then turn each partition to a RDD and put them in the queue. So: To turn RDD[(timestamp, value)] to DStream 1) Group by timestamp/windowSize 2) Sort RDD by (group, timestamp) 3) Use toLocalIterator to collect each group/partition 4) Turn each group/partition to RDD and put them in a Queue 5) Use SparkStreamingContext.queueStream to consume the Queue[RDD] as DStream Looks good to me, will try it today. The downside is all data needs to be collect to driver, is there a way to avoid doing this? Thanks Jianshi On Mon, Oct 27, 2014 at 4:57 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sure, let's still focus on the streaming simulation use case. It's a very useful problem to solve. If we're going to use the same Spark-streaming core for the simulation, the most simple way is to have a globally sorted RDDs and use ssc.queueStream. Thus collecting the Key part to driver is probably necessary. I've done offline simulation in Pig and it's absolutely non-trivial and error prone, also I had to have multiple copies of data due to overlaps of window. Therefore I would prefer using the existing streaming implementation for the simulation rather than a special DStream. Jianshi On Mon, Oct 27, 2014 at 4:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Yes, I understand what you want, but maybe hard to achieve without collecting back to driver node. Besides, can we just think of another way to do it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 4:07 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Yeah, you're absolutely right Saisai. My point is we should allow this kind of logic in RDD, let's say transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])]. Make sense? Jianshi On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you want is to make each bucket as a new RDD as what you mentioned in Pig syntax. gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket From my understanding, currently in Spark there’s no such kind of API to achieve this, maybe you have to create a customized RDD by yourself. For the code why cannot executed, .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to execute in remote side, which obviously do not has SparkContext, I think Spark cannot support nested RDD in closure. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 3:30 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Ok, back to Scala code, I'm wondering why I cannot do this: data.groupBy(timestamp / window) .sortByKey() // no sort method available here .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... .collect() // returns Seq[RDD[(Timestamp, T)]] Jianshi On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: You're absolutely right, it's not 'scalable' as I'm using collect(). However, it's important to have the RDDs ordered by the timestamp of the time window (groupBy puts data to corresponding timewindow). It's fairly easy to do in Pig, but somehow I have no idea how to express it in RDD... Something like (in Pig, pseudo code :): g = GROUP data BY (timestamp / windowSize) // group data into buckets in the same time window gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket stream = FOREACH gs GENERATE toRDD(g) No idea how to do the order by part in RDD. Jianshi On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com wrote: I think you solution may not be extendable if the data size is increasing, since you have to collect all your data back to driver node, so the memory usage of driver will be a problem. why not filter out specific time-range data as a rdd, after filtering the whole time-range, you will get a series of RDD with timestamp divided, and then feed into queue. Still it is not an efficient way, but it is not limited by driver memory. Also there may have some other solutions like shuffle to arrange data, but you cannot avoid scanning the whole data. Basically we need to avoid fetching large amount of data back to driver. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 2:39 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD
Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
Hi Srinivas, Here's the versions I'm using. spark.version1.2.0-SNAPSHOT/spark.version spray.version1.3.2/spray.version spray.json.version1.3.0/spray.json.version akka.grouporg.spark-project.akka/akka.group akka.version2.3.4-spark/akka.version I'm using Spark built from master. so it's 1.2.0-SNAPSHOT. Jianshi On Tue, Nov 11, 2014 at 4:06 AM, Srinivas Chamarthi srinivas.chamar...@gmail.com wrote: I am trying to use spark with spray and I have the dependency problem with quasiquotes. The issue comes up only when I include spark dependencies. I am not sure how this one can be excluded. Jianshi: can you let me know what version of spray + akka + spark are you using ? [error]org.scalamacros:quasiquotes _2.10, _2.10.3 [trace] Stack trace suppressed: run last *:update for the full output. [error] (*:update) Conflicting cross-version suffixes in: org.scalamacros:quasiq uotes On Thu, Oct 30, 2014 at 9:50 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Preshant, Chester, Mohammed, I switched to Spark's Akka and now it works well. Thanks for the help! (Need to exclude Akka from Spray dependencies, or specify it as provided) Jianshi On Thu, Oct 30, 2014 at 3:17 AM, Mohammed Guller moham...@glassbeam.com wrote: I am not sure about that. Can you try a Spray version built with 2.2.x along with Spark 1.1 and include the Akka dependencies in your project’s sbt file? Mohammed *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Tuesday, October 28, 2014 8:58 PM *To:* Mohammed Guller *Cc:* user *Subject:* Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4, right? Jianshi On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com wrote: Try a version built with Akka 2.2.x Mohammed *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Tuesday, October 28, 2014 3:03 AM *To:* user *Subject:* Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
Hi Preshant, Chester, Mohammed, I switched to Spark's Akka and now it works well. Thanks for the help! (Need to exclude Akka from Spray dependencies, or specify it as provided) Jianshi On Thu, Oct 30, 2014 at 3:17 AM, Mohammed Guller moham...@glassbeam.com wrote: I am not sure about that. Can you try a Spray version built with 2.2.x along with Spark 1.1 and include the Akka dependencies in your project’s sbt file? Mohammed *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Tuesday, October 28, 2014 8:58 PM *To:* Mohammed Guller *Cc:* user *Subject:* Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4, right? Jianshi On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com wrote: Try a version built with Akka 2.2.x Mohammed *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Tuesday, October 28, 2014 3:03 AM *To:* user *Subject:* Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
Here's the answer I got from Akka's user ML. This looks like a binary incompatibility issue. As far as I know Spark is using a custom built Akka and Scala for various reasons. You should ask this on the Spark mailing list, Akka is binary compatible between major versions (2.3.6 is compatible with 2.3.4) but Spark uses its own build which I guess does not guarantee binary compatiblity. *Anyone have used Spray or other Akka dependent libraries? Any binary compatibility issues?* Does that mean if I build Spray with Spark's Akka version akka.grouporg.spark-project.akka/akka.group akka.version2.3.4-spark/akka.version it should solve problem. Makes sense? I'll give it a shot when I have time, now probably I'll just not using Spray client... Cheers, Jianshi On Tue, Oct 28, 2014 at 6:02 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext
I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4, right? Jianshi On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com wrote: Try a version built with Akka 2.2.x Mohammed *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Tuesday, October 28, 2014 3:03 AM *To:* user *Subject:* Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext Hi, I got the following exceptions when using Spray client to write to OpenTSDB using its REST API. Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext; It worked locally in my Intellij but failed when I launch it from Spark-submit. Google suggested it's a compatibility issue in Akka. And I'm using latest Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark. I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for 2.3.4). Both failed with the same exception. Anyone has idea what went wrong? Need help! -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Spark streaming update/restart gracefully
Looks like currently solution to update spark-stream jars/configurations is to 1) save current Kafka offsets somewhere (say zookeeper) 2) shutdown the cluster and restart 3) connect to Kafka with previously saved offset Assuming we're reading from Kafka which provides nice persistence and read-from-offset feature. Is there a better solution? Is it possible to reload jars dynamically? Resize cluster size dynamically (SPARK-3174)? A reasonable ask is to have a strategy to update or restart executors gracefully. Something like https://issues.apache.org/jira/browse/STORM-167 Any suggestion? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: RDD to DStream
Hi Saisai, I understand it's non-trivial, but the requirement of simulating offline data as stream is also fair. :) I just wrote a prototype, however, I need to do a collect and a bunch of parallelize... // RDD of (timestamp, value) def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: StreamingContext): DStream[T] = { val sc = ssc.sparkContext val d = data.groupBy(_._1 / timeWindow) .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) .collect() .map(e = (e._1, sc.parallelize(e._2))) .sortBy(_._1) val queue = new mutable.SynchronizedQueue[RDD[T]] queue ++= d.map(_._2) ssc.queueStream(queue) } Any way to get a list of RDDs sorted by group key just after groupBy? Jianshi On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, For simulation purpose, I think you can try ConstantInputDStream and QueueInputDStream to convert one RDD or series of RDD into DStream, the first one output the same RDD in each batch duration, and the second one just output a RDD in a queue in each batch duration. You can take a look at it. For your case, I think TD’s comment are quite meaningful, it’s not trivial to do so, often requires a job to scan all the records, it’s also not the design purpose of Spark Streaming, I guess it’s hard to achieve what you want. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 1:42 PM *To:* Tathagata Das *Cc:* Aniket Bhatnagar; user@spark.apache.org *Subject:* Re: RDD to DStream I have a similar requirement. But instead of grouping it by chunkSize, I would have the timeStamp be part of the data. So the function I want has the following signature: // RDD of (timestamp, value) def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: StreamingContext): DStream[T] And DStream should respect the timestamp part. This is important for simulation, right? Do you have any good solution for this? Jianshi On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Hey Aniket, Great thoughts! I understand the usecase. But as you have realized yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are defined to be scan based, it is not efficient to define RDD based on slices of data within a partition of another RDD, using pure RDD transformations. What you have done is a decent, and probably the only feasible solution, with its limitations. Also the requirements of converting a batch of data to a stream of data can be pretty diverse. What rate, what # of events per batch, how many batches, is it efficient? Hence, it is not trivial to define a good, clean public API for that. If any one has any thoughts, ideas, etc on this, you are more than welcome to share them. TD On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: The use case for converting RDD into DStream is that I want to simulate a stream from an already persisted data for testing analytics. It is trivial to create a RDD from any persisted data but not so much for DStream. Therefore, my idea to create DStream from RDD. For example, lets say you are trying to implement analytics on time series data using Lambda architecture. This means you would have to implement the same analytics on streaming data (in streaming mode) as well as persisted data (in batch mode). The workflow for implementing the anlytics would be to first implement it in batch mode using RDD operations and then simulate stream to test the analytics in stream mode. The simulated stream should produce the elements at a specified rate. So the solution maybe to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having the size of elements that need to be streamed per time unit and then finally stream each RDD using the compute function. The problem with using QueueInputDStream is that it will stream data as per the batch duration specified in the streaming context and one cannot specify a custom slide duration. Moreover, the class QueueInputDStream is private to streaming package, so I can't really use it/extend it from an external package. Also, I could not find a good solution split a RDD into equal sized smaller RDDs that can be fed into an extended version of QueueInputDStream. Finally, here is what I came up with: class RDDExtension[T: ClassTag](rdd: RDD[T]) { def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { private val iterator = rdd.toLocalIterator // WARNING: each partition much fit in RAM of local machine. private val grouped = iterator.grouped(chunkSize) override def start(): Unit
Re: RDD to DStream
You're absolutely right, it's not 'scalable' as I'm using collect(). However, it's important to have the RDDs ordered by the timestamp of the time window (groupBy puts data to corresponding timewindow). It's fairly easy to do in Pig, but somehow I have no idea how to express it in RDD... Something like (in Pig, pseudo code :): g = GROUP data BY (timestamp / windowSize) // group data into buckets in the same time window gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket stream = FOREACH gs GENERATE toRDD(g) No idea how to do the order by part in RDD. Jianshi On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com wrote: I think you solution may not be extendable if the data size is increasing, since you have to collect all your data back to driver node, so the memory usage of driver will be a problem. why not filter out specific time-range data as a rdd, after filtering the whole time-range, you will get a series of RDD with timestamp divided, and then feed into queue. Still it is not an efficient way, but it is not limited by driver memory. Also there may have some other solutions like shuffle to arrange data, but you cannot avoid scanning the whole data. Basically we need to avoid fetching large amount of data back to driver. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 2:39 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Hi Saisai, I understand it's non-trivial, but the requirement of simulating offline data as stream is also fair. :) I just wrote a prototype, however, I need to do a collect and a bunch of parallelize... // RDD of (timestamp, value) def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: StreamingContext): DStream[T] = { val sc = ssc.sparkContext val d = data.groupBy(_._1 / timeWindow) .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) .collect() .map(e = (e._1, sc.parallelize(e._2))) .sortBy(_._1) val queue = new mutable.SynchronizedQueue[RDD[T]] queue ++= d.map(_._2) ssc.queueStream(queue) } Any way to get a list of RDDs sorted by group key just after groupBy? Jianshi On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, For simulation purpose, I think you can try ConstantInputDStream and QueueInputDStream to convert one RDD or series of RDD into DStream, the first one output the same RDD in each batch duration, and the second one just output a RDD in a queue in each batch duration. You can take a look at it. For your case, I think TD’s comment are quite meaningful, it’s not trivial to do so, often requires a job to scan all the records, it’s also not the design purpose of Spark Streaming, I guess it’s hard to achieve what you want. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 1:42 PM *To:* Tathagata Das *Cc:* Aniket Bhatnagar; user@spark.apache.org *Subject:* Re: RDD to DStream I have a similar requirement. But instead of grouping it by chunkSize, I would have the timeStamp be part of the data. So the function I want has the following signature: // RDD of (timestamp, value) def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: StreamingContext): DStream[T] And DStream should respect the timestamp part. This is important for simulation, right? Do you have any good solution for this? Jianshi On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Hey Aniket, Great thoughts! I understand the usecase. But as you have realized yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are defined to be scan based, it is not efficient to define RDD based on slices of data within a partition of another RDD, using pure RDD transformations. What you have done is a decent, and probably the only feasible solution, with its limitations. Also the requirements of converting a batch of data to a stream of data can be pretty diverse. What rate, what # of events per batch, how many batches, is it efficient? Hence, it is not trivial to define a good, clean public API for that. If any one has any thoughts, ideas, etc on this, you are more than welcome to share them. TD On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: The use case for converting RDD into DStream is that I want to simulate a stream from an already persisted data for testing analytics. It is trivial to create a RDD from any persisted data but not so much for DStream. Therefore, my idea to create DStream from RDD. For example, lets say you are trying
Re: RDD to DStream
Ok, back to Scala code, I'm wondering why I cannot do this: data.groupBy(timestamp / window) .sortByKey() // no sort method available here .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... .collect() // returns Seq[RDD[(Timestamp, T)]] Jianshi On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: You're absolutely right, it's not 'scalable' as I'm using collect(). However, it's important to have the RDDs ordered by the timestamp of the time window (groupBy puts data to corresponding timewindow). It's fairly easy to do in Pig, but somehow I have no idea how to express it in RDD... Something like (in Pig, pseudo code :): g = GROUP data BY (timestamp / windowSize) // group data into buckets in the same time window gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket stream = FOREACH gs GENERATE toRDD(g) No idea how to do the order by part in RDD. Jianshi On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com wrote: I think you solution may not be extendable if the data size is increasing, since you have to collect all your data back to driver node, so the memory usage of driver will be a problem. why not filter out specific time-range data as a rdd, after filtering the whole time-range, you will get a series of RDD with timestamp divided, and then feed into queue. Still it is not an efficient way, but it is not limited by driver memory. Also there may have some other solutions like shuffle to arrange data, but you cannot avoid scanning the whole data. Basically we need to avoid fetching large amount of data back to driver. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 2:39 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Hi Saisai, I understand it's non-trivial, but the requirement of simulating offline data as stream is also fair. :) I just wrote a prototype, however, I need to do a collect and a bunch of parallelize... // RDD of (timestamp, value) def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: StreamingContext): DStream[T] = { val sc = ssc.sparkContext val d = data.groupBy(_._1 / timeWindow) .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) .collect() .map(e = (e._1, sc.parallelize(e._2))) .sortBy(_._1) val queue = new mutable.SynchronizedQueue[RDD[T]] queue ++= d.map(_._2) ssc.queueStream(queue) } Any way to get a list of RDDs sorted by group key just after groupBy? Jianshi On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, For simulation purpose, I think you can try ConstantInputDStream and QueueInputDStream to convert one RDD or series of RDD into DStream, the first one output the same RDD in each batch duration, and the second one just output a RDD in a queue in each batch duration. You can take a look at it. For your case, I think TD’s comment are quite meaningful, it’s not trivial to do so, often requires a job to scan all the records, it’s also not the design purpose of Spark Streaming, I guess it’s hard to achieve what you want. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 1:42 PM *To:* Tathagata Das *Cc:* Aniket Bhatnagar; user@spark.apache.org *Subject:* Re: RDD to DStream I have a similar requirement. But instead of grouping it by chunkSize, I would have the timeStamp be part of the data. So the function I want has the following signature: // RDD of (timestamp, value) def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: StreamingContext): DStream[T] And DStream should respect the timestamp part. This is important for simulation, right? Do you have any good solution for this? Jianshi On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Hey Aniket, Great thoughts! I understand the usecase. But as you have realized yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are defined to be scan based, it is not efficient to define RDD based on slices of data within a partition of another RDD, using pure RDD transformations. What you have done is a decent, and probably the only feasible solution, with its limitations. Also the requirements of converting a batch of data to a stream of data can be pretty diverse. What rate, what # of events per batch, how many batches, is it efficient? Hence, it is not trivial to define a good, clean public API for that. If any one has any thoughts, ideas, etc on this, you are more than welcome to share them. TD On Mon, Aug 4, 2014 at 12:43 AM
Re: RDD to DStream
Yeah, you're absolutely right Saisai. My point is we should allow this kind of logic in RDD, let's say transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])]. Make sense? Jianshi On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you want is to make each bucket as a new RDD as what you mentioned in Pig syntax. gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket From my understanding, currently in Spark there’s no such kind of API to achieve this, maybe you have to create a customized RDD by yourself. For the code why cannot executed, .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to execute in remote side, which obviously do not has SparkContext, I think Spark cannot support nested RDD in closure. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 3:30 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Ok, back to Scala code, I'm wondering why I cannot do this: data.groupBy(timestamp / window) .sortByKey() // no sort method available here .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... .collect() // returns Seq[RDD[(Timestamp, T)]] Jianshi On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: You're absolutely right, it's not 'scalable' as I'm using collect(). However, it's important to have the RDDs ordered by the timestamp of the time window (groupBy puts data to corresponding timewindow). It's fairly easy to do in Pig, but somehow I have no idea how to express it in RDD... Something like (in Pig, pseudo code :): g = GROUP data BY (timestamp / windowSize) // group data into buckets in the same time window gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket stream = FOREACH gs GENERATE toRDD(g) No idea how to do the order by part in RDD. Jianshi On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com wrote: I think you solution may not be extendable if the data size is increasing, since you have to collect all your data back to driver node, so the memory usage of driver will be a problem. why not filter out specific time-range data as a rdd, after filtering the whole time-range, you will get a series of RDD with timestamp divided, and then feed into queue. Still it is not an efficient way, but it is not limited by driver memory. Also there may have some other solutions like shuffle to arrange data, but you cannot avoid scanning the whole data. Basically we need to avoid fetching large amount of data back to driver. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 2:39 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Hi Saisai, I understand it's non-trivial, but the requirement of simulating offline data as stream is also fair. :) I just wrote a prototype, however, I need to do a collect and a bunch of parallelize... // RDD of (timestamp, value) def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: StreamingContext): DStream[T] = { val sc = ssc.sparkContext val d = data.groupBy(_._1 / timeWindow) .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) .collect() .map(e = (e._1, sc.parallelize(e._2))) .sortBy(_._1) val queue = new mutable.SynchronizedQueue[RDD[T]] queue ++= d.map(_._2) ssc.queueStream(queue) } Any way to get a list of RDDs sorted by group key just after groupBy? Jianshi On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, For simulation purpose, I think you can try ConstantInputDStream and QueueInputDStream to convert one RDD or series of RDD into DStream, the first one output the same RDD in each batch duration, and the second one just output a RDD in a queue in each batch duration. You can take a look at it. For your case, I think TD’s comment are quite meaningful, it’s not trivial to do so, often requires a job to scan all the records, it’s also not the design purpose of Spark Streaming, I guess it’s hard to achieve what you want. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 1:42 PM *To:* Tathagata Das *Cc:* Aniket Bhatnagar; user@spark.apache.org *Subject:* Re: RDD to DStream I have a similar requirement. But instead of grouping it by chunkSize, I would have the timeStamp be part of the data. So the function I want has the following
Ephemeral Hive metastore for HiveContext?
There's an annoying small usability issue in HiveContext. By default, it creates a local metastore which forbids other processes using HiveContext to be launched from the same directory. How can I make the metastore local to each HiveContext? Is there an in-memory metastore configuration? /tmp/ temp folders is one solution, but it's not elegant and I still need to clean up the files... I can add hive-site.xml and use a shared metastore, however they'll still operate in the same catalog space. (Simple) SQLContext by default uses in-memory catalog which is bound to each context. Since HiveContext is a subclass, we should make the same semantics as default. Make sense? Spark is very much functional and shared nothing, these are wonderful features. Let's not have something global as a dependency. Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: RDD to DStream
Sure, let's still focus on the streaming simulation use case. It's a very useful problem to solve. If we're going to use the same Spark-streaming core for the simulation, the most simple way is to have a globally sorted RDDs and use ssc.queueStream. Thus collecting the Key part to driver is probably necessary. I've done offline simulation in Pig and it's absolutely non-trivial and error prone, also I had to have multiple copies of data due to overlaps of window. Therefore I would prefer using the existing streaming implementation for the simulation rather than a special DStream. Jianshi On Mon, Oct 27, 2014 at 4:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Yes, I understand what you want, but maybe hard to achieve without collecting back to driver node. Besides, can we just think of another way to do it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 4:07 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Yeah, you're absolutely right Saisai. My point is we should allow this kind of logic in RDD, let's say transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])]. Make sense? Jianshi On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you want is to make each bucket as a new RDD as what you mentioned in Pig syntax. gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket From my understanding, currently in Spark there’s no such kind of API to achieve this, maybe you have to create a customized RDD by yourself. For the code why cannot executed, .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to execute in remote side, which obviously do not has SparkContext, I think Spark cannot support nested RDD in closure. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 3:30 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Ok, back to Scala code, I'm wondering why I cannot do this: data.groupBy(timestamp / window) .sortByKey() // no sort method available here .map(sc.parallelize(_._2.sortBy(_._1))) // nested RDD, hmm... .collect() // returns Seq[RDD[(Timestamp, T)]] Jianshi On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: You're absolutely right, it's not 'scalable' as I'm using collect(). However, it's important to have the RDDs ordered by the timestamp of the time window (groupBy puts data to corresponding timewindow). It's fairly easy to do in Pig, but somehow I have no idea how to express it in RDD... Something like (in Pig, pseudo code :): g = GROUP data BY (timestamp / windowSize) // group data into buckets in the same time window gs = ORDER g BY group ASC, g.timestamp ASC // 'group' is the rounded timestamp for each bucket stream = FOREACH gs GENERATE toRDD(g) No idea how to do the order by part in RDD. Jianshi On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com wrote: I think you solution may not be extendable if the data size is increasing, since you have to collect all your data back to driver node, so the memory usage of driver will be a problem. why not filter out specific time-range data as a rdd, after filtering the whole time-range, you will get a series of RDD with timestamp divided, and then feed into queue. Still it is not an efficient way, but it is not limited by driver memory. Also there may have some other solutions like shuffle to arrange data, but you cannot avoid scanning the whole data. Basically we need to avoid fetching large amount of data back to driver. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Monday, October 27, 2014 2:39 PM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com) *Subject:* Re: RDD to DStream Hi Saisai, I understand it's non-trivial, but the requirement of simulating offline data as stream is also fair. :) I just wrote a prototype, however, I need to do a collect and a bunch of parallelize... // RDD of (timestamp, value) def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: StreamingContext): DStream[T] = { val sc = ssc.sparkContext val d = data.groupBy(_._1 / timeWindow) .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2))) .collect() .map(e = (e._1, sc.parallelize(e._2))) .sortBy(_._1) val queue = new mutable.SynchronizedQueue[RDD[T]] queue ++= d.map(_._2) ssc.queueStream(queue) } Any way
Re: Which is better? One spark app listening to 10 topics vs. 10 spark apps each listening to 1 topic
Any suggestion? :) Jianshi On Thu, Oct 23, 2014 at 3:49 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: The Kafka stream has 10 topics and the data rate is quite high (~ 100K/s per topic). Which configuration do you recommend? - 1 Spark app consuming all Kafka topics - 10 separate Spark app each consuming one topic Assuming they have the same resource pool. Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Ephemeral Hive metastore for HiveContext?
Thanks Ted and Cheng for the in memory derby solution. I'll check it out. :) And to me, using in-mem by default makes sense, if user wants a shared metastore, it needs to be specified. An 'embedded' local metastore in the working directory barely has a use case. Jianshi On Mon, Oct 27, 2014 at 9:57 PM, Cheng Lian lian.cs@gmail.com wrote: Thanks Ted, this is exactly what Spark SQL LocalHiveContext does. To make an embedded metastore local to a single HiveContext, we must allocate different Derby database directories for each HiveContext, and Jianshi is also trying to avoid that. On 10/27/14 9:44 PM, Ted Yu wrote: Please see https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin#AdminManualMetastoreAdmin-EmbeddedMetastore Cheers On Oct 27, 2014, at 6:20 AM, Cheng Lian lian.cs@gmail.com wrote: I have never tried this yet, but maybe you can use an in-memory Derby database as metastore https://db.apache.org/derby/docs/10.7/devguide/cdevdvlpinmemdb.html I'll investigate this when free, guess we can use this for Spark SQL Hive support testing. On 10/27/14 4:38 PM, Jianshi Huang wrote: There's an annoying small usability issue in HiveContext. By default, it creates a local metastore which forbids other processes using HiveContext to be launched from the same directory. How can I make the metastore local to each HiveContext? Is there an in-memory metastore configuration? /tmp/ temp folders is one solution, but it's not elegant and I still need to clean up the files... I can add hive-site.xml and use a shared metastore, however they'll still operate in the same catalog space. (Simple) SQLContext by default uses in-memory catalog which is bound to each context. Since HiveContext is a subclass, we should make the same semantics as default. Make sense? Spark is very much functional and shared nothing, these are wonderful features. Let's not have something global as a dependency. Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: RDD to DStream
2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Sometimes it is useful to convert a RDD into a DStream for testing purposes (generating DStreams from historical data, etc). Is there an easy way to do this? I could come up with the following inefficient way but no sure if there is a better way to achieve this. Thoughts? class RDDExtension[T](rdd: RDD[T]) { def chunked(chunkSize: Int): RDD[Seq[T]] = { rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize)) } def skipFirst(): RDD[T] = { rdd.zipWithIndex().filter(tuple = tuple._2 0).map(_._1) } def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize) override def start(): Unit = {} override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { val chunk = currentRDD.take(1) currentRDD = currentRDD.skipFirst() Some(rdd.sparkContext.parallelize(chunk)) } override def slideDuration = { slideDurationMilli.map(duration = new Duration(duration)). getOrElse(super.slideDuration) } } } -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Dynamically loaded Spark-stream consumer
I have a use case that I need to continuously ingest data from Kafka stream. However apart from ingestion (to HBase), I also need to compute some metrics (i.e. avg for last min, etc.). The problem is that it's very likely I'll continuously add more metrics and I don't want to restart my spark program from time to time. Is there a mechanism that Spark stream can load and plugin code in runtime without restarting? Any solutions or suggestions? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Which is better? One spark app listening to 10 topics vs. 10 spark apps each listening to 1 topic
The Kafka stream has 10 topics and the data rate is quite high (~ 100K/s per topic). Which configuration do you recommend? - 1 Spark app consuming all Kafka topics - 10 separate Spark app each consuming one topic Assuming they have the same resource pool. Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Multitenancy in Spark - within/across spark context
Upvote for the multitanency requirement. I'm also building a data analytic platform and there'll be multiple users running queries and computations simultaneously. One of the paint point is control of resource size. Users don't really know how much nodes they need, they always use as much as possible... The result is lots of wasted resource in our Yarn cluster. A way to 1) allow multiple spark context to share the same resource or 2) add dynamic resource management for Yarn mode is very much wanted. Jianshi On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: That's not something you might want to do usually. In general, a SparkContext maps to a user application My question was basically this. In this page in the official doc, under Scheduling within an application section, it talks about multiuser and fair sharing within an app. How does multiuser within an application work(how users connect to an app,run their stuff) ? When would I want to use this ? I see. The way I read that page is that Spark supports all those scheduling options; but Spark doesn't give you the means to actually be able to submit jobs from different users to a running SparkContext hosted on a different process. For that, you'll need something like the job server that I referenced before, or write your own framework for supporting that. Personally, I'd use the information on that page when dealing with concurrent jobs in the same SparkContext, but still restricted to the same user. I'd avoid trying to create any application where a single SparkContext is trying to be shared by multiple users in any way. As far as I understand, this will cause executors to be killed, which means that Spark will start retrying tasks to rebuild the data that was held by those executors when needed. I basically wanted to find out if there were any gotchas related to preemption on Spark. Things like say half of an application's executors got preempted say while doing reduceByKey, will the application progress with the remaining resources/fair share ? Jobs should still make progress as long as at least one executor is available. The gotcha would be the one I mentioned, where Spark will fail your job after x executors failed, which might be a common occurrence when preemption is enabled. That being said, it's a configurable option, so you can set x to a very large value and your job should keep on chugging along. The options you'd want to take a look at are: spark.task.maxFailures and spark.yarn.max.executor.failures -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
SPARK-3106 fixed?
https://issues.apache.org/jira/browse/SPARK-3106 I'm having the saming errors described in SPARK-3106 (no other types of errors confirmed), running a bunch sql queries on spark 1.2.0 built from latest master HEAD. Any updates to this issue? My main task is to join a huge fact table with a dozen dim tables (using HiveContext) and then map it to my class object. It failed a couple of times and now I cached the intermediate table and currently it seems working fine... no idea why until I found SPARK-3106 Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: SPARK-3106 fixed?
Hmm... it failed again, just lasted a little bit longer. Jianshi On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: https://issues.apache.org/jira/browse/SPARK-3106 I'm having the saming errors described in SPARK-3106 (no other types of errors confirmed), running a bunch sql queries on spark 1.2.0 built from latest master HEAD. Any updates to this issue? My main task is to join a huge fact table with a dozen dim tables (using HiveContext) and then map it to my class object. It failed a couple of times and now I cached the intermediate table and currently it seems working fine... no idea why until I found SPARK-3106 Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: SPARK-3106 fixed?
Turned out it was caused by this issue: https://issues.apache.org/jira/browse/SPARK-3923 Set spark.akka.heartbeat.interval to 100 solved it. Jianshi On Mon, Oct 13, 2014 at 4:24 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... it failed again, just lasted a little bit longer. Jianshi On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: https://issues.apache.org/jira/browse/SPARK-3106 I'm having the saming errors described in SPARK-3106 (no other types of errors confirmed), running a bunch sql queries on spark 1.2.0 built from latest master HEAD. Any updates to this issue? My main task is to join a huge fact table with a dozen dim tables (using HiveContext) and then map it to my class object. It failed a couple of times and now I cached the intermediate table and currently it seems working fine... no idea why until I found SPARK-3106 Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: SPARK-3106 fixed?
One thing made me very confused during debuggin is the error message. The important one WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@xxx:50278] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. is of Log Level WARN. Jianshi On Tue, Oct 14, 2014 at 4:36 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Turned out it was caused by this issue: https://issues.apache.org/jira/browse/SPARK-3923 Set spark.akka.heartbeat.interval to 100 solved it. Jianshi On Mon, Oct 13, 2014 at 4:24 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... it failed again, just lasted a little bit longer. Jianshi On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: https://issues.apache.org/jira/browse/SPARK-3106 I'm having the saming errors described in SPARK-3106 (no other types of errors confirmed), running a bunch sql queries on spark 1.2.0 built from latest master HEAD. Any updates to this issue? My main task is to join a huge fact table with a dozen dim tables (using HiveContext) and then map it to my class object. It failed a couple of times and now I cached the intermediate table and currently it seems working fine... no idea why until I found SPARK-3106 Cheers, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/