Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
Hi Marcelo,

I see what you mean. Tried it but still got same error message.

Error from python worker:
>   Traceback (most recent call last):
> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in 
> _run_module_as_main
>   mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in 
> _get_module_details
>   __import__(pkg_name)
> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", 
> line 46, in 
> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", 
> line 29, in 
>   ModuleNotFoundError: No module named 'py4j'
> PYTHONPATH was:
>   
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk3/yarn/usercache/jianshi.huang/filecache/134/__spark_libs__8468485589501316413.zip/spark-core_2.11-2.3.2.jar
>
>
On Fri, Oct 5, 2018 at 1:25 AM Marcelo Vanzin  wrote:

> Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get
> expanded by the shell).
>
> But it's really weird to be setting SPARK_HOME in the environment of
> your node managers. YARN shouldn't need to know about that.
> On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang 
> wrote:
> >
> >
> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31
> >
> > The code shows Spark will try to find the path if SPARK_HOME is
> specified. And on my worker node, SPARK_HOME is specified in .bashrc , for
> the pre-installed 2.2.1 path.
> >
> > I don't want to make any changes to worker node configuration, so any
> way to override the order?
> >
> > Jianshi
> >
> > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin 
> wrote:
> >>
> >> Normally the version of Spark installed on the cluster does not
> >> matter, since Spark is uploaded from your gateway machine to YARN by
> >> default.
> >>
> >> You probably have some configuration (in spark-defaults.conf) that
> >> tells YARN to use a cached copy. Get rid of that configuration, and
> >> you can use whatever version you like.
> >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a problem using multiple versions of Pyspark on YARN, the
> driver and worker nodes are all preinstalled with Spark 2.2.1, for
> production tasks. And I want to use 2.3.2 for my personal EDA.
> >> >
> >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(),
> however on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
> >> >
> >> > Anyone knows how to override the PYTHONPATH on worker nodes?
> >> >
> >> > Here's the error message,
> >> >>
> >> >>
> >> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
> >> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> >> >> Error from python worker:
> >> >> Traceback (most recent call last):
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> >> >> __import__(pkg_name)
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
> 46, in 
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
> 29, in 
> >> >> ModuleNotFoundError: No module named 'py4j'
> >> >> PYTHONPATH was:
> >> >>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
> >> >
> >> >
> >> > And here's how I started Pyspark session in Jupyter.
> >> >>
> >> >>
> >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> >> >> %env PYSPARK_PYTHON=/usr/bin/python3
> >> >> import findsp

Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
Thanks Marcelo,

But I don't want to install 2.3.2 on the worker nodes. I just want Spark to
use the path of the files uploaded to YARN instead of the SPARK_HOME.

On Fri, Oct 5, 2018 at 1:25 AM Marcelo Vanzin  wrote:

> Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get
> expanded by the shell).
>
> But it's really weird to be setting SPARK_HOME in the environment of
> your node managers. YARN shouldn't need to know about that.
> On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang 
> wrote:
> >
> >
> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31
> >
> > The code shows Spark will try to find the path if SPARK_HOME is
> specified. And on my worker node, SPARK_HOME is specified in .bashrc , for
> the pre-installed 2.2.1 path.
> >
> > I don't want to make any changes to worker node configuration, so any
> way to override the order?
> >
> > Jianshi
> >
> > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin 
> wrote:
> >>
> >> Normally the version of Spark installed on the cluster does not
> >> matter, since Spark is uploaded from your gateway machine to YARN by
> >> default.
> >>
> >> You probably have some configuration (in spark-defaults.conf) that
> >> tells YARN to use a cached copy. Get rid of that configuration, and
> >> you can use whatever version you like.
> >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a problem using multiple versions of Pyspark on YARN, the
> driver and worker nodes are all preinstalled with Spark 2.2.1, for
> production tasks. And I want to use 2.3.2 for my personal EDA.
> >> >
> >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(),
> however on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
> >> >
> >> > Anyone knows how to override the PYTHONPATH on worker nodes?
> >> >
> >> > Here's the error message,
> >> >>
> >> >>
> >> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
> >> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> >> >> Error from python worker:
> >> >> Traceback (most recent call last):
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> >> >> __import__(pkg_name)
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
> 46, in 
> >> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
> 29, in 
> >> >> ModuleNotFoundError: No module named 'py4j'
> >> >> PYTHONPATH was:
> >> >>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
> >> >
> >> >
> >> > And here's how I started Pyspark session in Jupyter.
> >> >>
> >> >>
> >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> >> >> %env PYSPARK_PYTHON=/usr/bin/python3
> >> >> import findspark
> >> >> findspark.init()
> >> >> import pyspark
> >> >> sparkConf = pyspark.SparkConf()
> >> >> sparkConf.setAll([
> >> >> ('spark.cores.max', '96')
> >> >> ,('spark.driver.memory', '2g')
> >> >> ,('spark.executor.cores', '4')
> >> >> ,('spark.executor.instances', '2')
> >> >> ,('spark.executor.memory', '4g')
> >> >> ,('spark.network.timeout', '800')
> >> >> ,('spark.scheduler.mode', 'FAIR')
> >> >> ,('spark.shuffle.service.enabled', 'true')
> >> >> ,('spark.dynamicAllocation.enabled', 'true')
> >> >> ])
> >> >> py_files =
> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
> >> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
> conf=sparkConf, pyFiles=py_files)
> >> >>
> >> >
> >> >
> >> > Thanks,
> >> > --
> >> > Jianshi Huang
> >> >
> >>
> >>
> >> --
> >> Marcelo
> >
> >
> >
> > --
> > Jianshi Huang
> >
> > LinkedIn: jianshi
> > Twitter: @jshuang
> > Github & Blog: http://huangjs.github.com/
>
>
>
> --
> Marcelo
>


-- 
Jianshi Huang


Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
Yes, that's right.

On Fri, Oct 5, 2018 at 3:35 AM Gourav Sengupta 
wrote:

> Hi Marcelo,
> it will be great if you illustrate what you mean, I will be interested to
> know.
>
> Hi Jianshi,
> so just to be sure you want to work on SPARK 2.3 while having SPARK 2.1
> installed in your cluster?
>
> Regards,
> Gourav Sengupta
>
> On Thu, Oct 4, 2018 at 6:26 PM Marcelo Vanzin 
> wrote:
>
>> Try "spark.executorEnv.SPARK_HOME=$PWD" (in quotes so it does not get
>> expanded by the shell).
>>
>> But it's really weird to be setting SPARK_HOME in the environment of
>> your node managers. YARN shouldn't need to know about that.
>> On Thu, Oct 4, 2018 at 10:22 AM Jianshi Huang 
>> wrote:
>> >
>> >
>> https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31
>> >
>> > The code shows Spark will try to find the path if SPARK_HOME is
>> specified. And on my worker node, SPARK_HOME is specified in .bashrc , for
>> the pre-installed 2.2.1 path.
>> >
>> > I don't want to make any changes to worker node configuration, so any
>> way to override the order?
>> >
>> > Jianshi
>> >
>> > On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin 
>> wrote:
>> >>
>> >> Normally the version of Spark installed on the cluster does not
>> >> matter, since Spark is uploaded from your gateway machine to YARN by
>> >> default.
>> >>
>> >> You probably have some configuration (in spark-defaults.conf) that
>> >> tells YARN to use a cached copy. Get rid of that configuration, and
>> >> you can use whatever version you like.
>> >> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
>> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I have a problem using multiple versions of Pyspark on YARN, the
>> driver and worker nodes are all preinstalled with Spark 2.2.1, for
>> production tasks. And I want to use 2.3.2 for my personal EDA.
>> >> >
>> >> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(),
>> however on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
>> >> >
>> >> > Anyone knows how to override the PYTHONPATH on worker nodes?
>> >> >
>> >> > Here's the error message,
>> >> >>
>> >> >>
>> >> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
>> >> >> : org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task
>> 0.3 in stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
>> org.apache.spark.SparkException:
>> >> >> Error from python worker:
>> >> >> Traceback (most recent call last):
>> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
>> _run_module_as_main
>> >> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
>> >> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
>> _get_module_details
>> >> >> __import__(pkg_name)
>> >> >> File
>> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
>> 46, in 
>> >> >> File
>> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
>> 29, in 
>> >> >> ModuleNotFoundError: No module named 'py4j'
>> >> >> PYTHONPATH was:
>> >> >>
>> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
>> >> >
>> >> >
>> >> > And here's how I started Pyspark session in Jupyter.
>> >> >>
>> >> >>
>> >> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
>> >> >> %env PYSPARK_PYTHON=/usr/bin/python3
>> >> >> import findspark
>> >> >> findspark.init()
>> >> >> import pyspark
>> >> >> sparkConf = pyspark.SparkConf()
>> >> >> sparkConf.setAll([
>> >> >> ('spark.cores.max', '96')
>> >> >> ,('spark.driver.memory', '2g')
>> >> >> ,('spark.executor.cores', '4')
>> >> >> ,('spark.executor.instances', '2')
>> >> >> ,('spark.executor.memory', '4g')
>> >> >> ,('spark.network.timeout', '800')
>> >> >> ,('spark.scheduler.mode', 'FAIR')
>> >> >> ,('spark.shuffle.service.enabled', 'true')
>> >> >> ,('spark.dynamicAllocation.enabled', 'true')
>> >> >> ])
>> >> >> py_files =
>> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
>> >> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
>> conf=sparkConf, pyFiles=py_files)
>> >> >>
>> >> >
>> >> >
>> >> > Thanks,
>> >> > --
>> >> > Jianshi Huang
>> >> >
>> >>
>> >>
>> >> --
>> >> Marcelo
>> >
>> >
>> >
>> > --
>> > Jianshi Huang
>> >
>> > LinkedIn: jianshi
>> > Twitter: @jshuang
>> > Github & Blog: http://huangjs.github.com/
>>
>>
>>
>> --
>> Marcelo
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 
Jianshi Huang


Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
https://github.com/apache/spark/blob/88e7e87bd5c052e10f52d4bb97a9d78f5b524128/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala#L31

The code shows Spark will try to find the path if SPARK_HOME is specified.
And on my worker node, SPARK_HOME is specified in .bashrc , for the
pre-installed 2.2.1 path.

I don't want to make any changes to worker node configuration, so any way
to override the order?

Jianshi

On Fri, Oct 5, 2018 at 12:11 AM Marcelo Vanzin  wrote:

> Normally the version of Spark installed on the cluster does not
> matter, since Spark is uploaded from your gateway machine to YARN by
> default.
>
> You probably have some configuration (in spark-defaults.conf) that
> tells YARN to use a cached copy. Get rid of that configuration, and
> you can use whatever version you like.
> On Thu, Oct 4, 2018 at 2:19 AM Jianshi Huang 
> wrote:
> >
> > Hi,
> >
> > I have a problem using multiple versions of Pyspark on YARN, the driver
> and worker nodes are all preinstalled with Spark 2.2.1, for production
> tasks. And I want to use 2.3.2 for my personal EDA.
> >
> > I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however
> on the worker node, the PYTHONPATH still uses the system SPARK_HOME.
> >
> > Anyone knows how to override the PYTHONPATH on worker nodes?
> >
> > Here's the error message,
> >>
> >>
> >> Py4JJavaError: An error occurred while calling o75.collectToPython.
> >> : org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> >> Error from python worker:
> >> Traceback (most recent call last):
> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> >> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> >> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> >> __import__(pkg_name)
> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line
> 46, in 
> >> File
> "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line
> 29, in 
> >> ModuleNotFoundError: No module named 'py4j'
> >> PYTHONPATH was:
> >>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar
> >
> >
> > And here's how I started Pyspark session in Jupyter.
> >>
> >>
> >> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> >> %env PYSPARK_PYTHON=/usr/bin/python3
> >> import findspark
> >> findspark.init()
> >> import pyspark
> >> sparkConf = pyspark.SparkConf()
> >> sparkConf.setAll([
> >> ('spark.cores.max', '96')
> >> ,('spark.driver.memory', '2g')
> >> ,('spark.executor.cores', '4')
> >> ,('spark.executor.instances', '2')
> >> ,('spark.executor.memory', '4g')
> >> ,('spark.network.timeout', '800')
> >> ,('spark.scheduler.mode', 'FAIR')
> >> ,('spark.shuffle.service.enabled', 'true')
> >> ,('spark.dynamicAllocation.enabled', 'true')
> >> ])
> >> py_files =
> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
> >> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
> conf=sparkConf, pyFiles=py_files)
> >>
> >
> >
> > Thanks,
> > --
> > Jianshi Huang
> >
>
>
> --
> Marcelo
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Jianshi Huang
Hi,

I have a problem using multiple versions of Pyspark on YARN, the driver and
worker nodes are all preinstalled with Spark 2.2.1, for production tasks.
And I want to use 2.3.2 for my personal EDA.

I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however on
the worker node, the PYTHONPATH still uses the system SPARK_HOME.

Anyone knows how to override the PYTHONPATH on worker nodes?

Here's the error message,

>
> Py4JJavaError: An error occurred while calling o75.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2):
> org.apache.spark.SparkException:
> Error from python worker:
> Traceback (most recent call last):
> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in
> _run_module_as_main
> mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
> File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in
> _get_module_details
> __import__(pkg_name)
> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py",
> line 46, in 
> File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py",
> line 29, in 
> ModuleNotFoundError: No module named 'py4j'
> PYTHONPATH was:
>
> /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar


And here's how I started Pyspark session in Jupyter.

>
> %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
> %env PYSPARK_PYTHON=/usr/bin/python3
> import findspark
> findspark.init()
> import pyspark
> sparkConf = pyspark.SparkConf()
> sparkConf.setAll([
> ('spark.cores.max', '96')
> ,('spark.driver.memory', '2g')
> ,('spark.executor.cores', '4')
> ,('spark.executor.instances', '2')
> ,('spark.executor.memory', '4g')
> ,('spark.network.timeout', '800')
> ,('spark.scheduler.mode', 'FAIR')
> ,('spark.shuffle.service.enabled', 'true')
> ,('spark.dynamicAllocation.enabled', 'true')
> ])
> py_files =
> ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
> sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
> conf=sparkConf, pyFiles=py_files)
>
>

Thanks,
-- 
Jianshi Huang


Re: View all user's application logs in history server

2015-05-27 Thread Jianshi Huang
No one using History server? :)

Am I the only one need to see all user's logs?

Jianshi

On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I'm using Spark 1.4.0-rc1 and I'm using default settings for history
 server.

 But I can only see my own logs. Is it possible to view all user's logs?
 The permission is fine for the user group.

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: View all user's application logs in history server

2015-05-27 Thread Jianshi Huang
Yes, all written to the same directory on HDFS.

Jianshi

On Wed, May 27, 2015 at 11:57 PM, Marcelo Vanzin van...@cloudera.com
wrote:

 You may be the only one not seeing all the logs. Are you sure all the
 users are writing to the same log directory? The HS can only read from a
 single log directory.

 On Wed, May 27, 2015 at 5:33 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 No one using History server? :)

 Am I the only one need to see all user's logs?

 Jianshi

 On Thu, May 21, 2015 at 1:29 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I'm using Spark 1.4.0-rc1 and I'm using default settings for history
 server.

 But I can only see my own logs. Is it possible to view all user's logs?
 The permission is fine for the user group.

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Marcelo




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


View all user's application logs in history server

2015-05-20 Thread Jianshi Huang
Hi,

I'm using Spark 1.4.0-rc1 and I'm using default settings for history server.

But I can only see my own logs. Is it possible to view all user's logs? The
permission is fine for the user group.

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Why so slow

2015-05-12 Thread Jianshi Huang
Hi Olivier,

Here it is.

== Physical Plan ==
Aggregate false, [PartialGroup#155], [PartialGroup#155 AS
is_bad#108,Coalesce(SUM(PartialCount#152L),0) AS
count#109L,(CAST(SUM(PartialSum#153), DoubleType) /
CAST(SUM(PartialCount#154L), DoubleType)) AS avg#110]
 Exchange (HashPartitioning [PartialGroup#155], 200), []
  Aggregate true, [meta#143[is_bad]], [meta#143[is_bad] AS
PartialGroup#155,COUNT(1) AS PartialCount#152L,COUNT(nvar#145[var1]) AS
PartialCount#154L,SUM(nvar#145[var1]) AS PartialSum#153]
   Project [meta#143,nvar#145]
Filter ((date#147 = 2014-04-01)  (date#147 = 2014-04-30))
 PhysicalRDD [meta#143,nvar#145,date#147], MapPartitionsRDD[6] at
explain at console:32


Jianshi

On Tue, May 12, 2015 at 10:34 PM, Olivier Girardot ssab...@gmail.com
wrote:

 can you post the explain too ?

 Le mar. 12 mai 2015 à 12:11, Jianshi Huang jianshi.hu...@gmail.com a
 écrit :

 Hi,

 I have a SQL query on tables containing big Map columns (thousands of
 keys). I found it to be very slow.

 select meta['is_bad'] as is_bad, count(*) as count, avg(nvar['var1']) as
 avg
 from test
 where date between '2014-04-01' and '2014-04-30'
 group by meta['is_bad']

 =

 +-+---+---+
 | is_bad  |   count   |  avg  |
 +-+---+---+
 | 0   | 17024396  | 0.16257395850742645   |
 | 1   | 179729| -0.37626256661125485  |
 | 2   | 28128 | 0.11674427263203344   |
 | 3   | 116327| -0.6398689187187386   |
 | 4   | 87715 | -0.5349632960030563   |
 | 5   | 169771| 0.40812641191854626   |
 | 6   | 542447| 0.5238256418341465|
 | 7   | 160324| 0.29442847034840386   |
 | 8   | 2099  | -0.9165701665162977   |
 | 9   | 3104  | 0.3845685004598235|
 +-+---+---+
 10 rows selected (130.5 seconds)


 The total number of rows is less than 20M. Why so slow?

 I'm running on Spark 1.4.0-SNAPSHOT with 100 executors each having 4GB
 ram and 2 CPU core.

 Looks like https://issues.apache.org/jira/browse/SPARK-5446 is still
 open, when can we have it fixed? :)

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Why so slow

2015-05-12 Thread Jianshi Huang
Hi,

I have a SQL query on tables containing big Map columns (thousands of
keys). I found it to be very slow.

select meta['is_bad'] as is_bad, count(*) as count, avg(nvar['var1']) as
avg
from test
where date between '2014-04-01' and '2014-04-30'
group by meta['is_bad']

=

+-+---+---+
| is_bad  |   count   |  avg  |
+-+---+---+
| 0   | 17024396  | 0.16257395850742645   |
| 1   | 179729| -0.37626256661125485  |
| 2   | 28128 | 0.11674427263203344   |
| 3   | 116327| -0.6398689187187386   |
| 4   | 87715 | -0.5349632960030563   |
| 5   | 169771| 0.40812641191854626   |
| 6   | 542447| 0.5238256418341465|
| 7   | 160324| 0.29442847034840386   |
| 8   | 2099  | -0.9165701665162977   |
| 9   | 3104  | 0.3845685004598235|
+-+---+---+
10 rows selected (130.5 seconds)


The total number of rows is less than 20M. Why so slow?

I'm running on Spark 1.4.0-SNAPSHOT with 100 executors each having 4GB ram
and 2 CPU core.

Looks like https://issues.apache.org/jira/browse/SPARK-5446 is still open,
when can we have it fixed? :)

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle

2015-05-06 Thread Jianshi Huang
I'm using the default settings.

Jianshi

On Wed, May 6, 2015 at 7:05 PM, twinkle sachdeva twinkle.sachd...@gmail.com
 wrote:

 Hi,

 Can you please share your compression etc settings, which you are using.

 Thanks,
 Twinkle

 On Wed, May 6, 2015 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 I'm facing this error in Spark 1.3.1

   https://issues.apache.org/jira/browse/SPARK-4105

 Anyone knows what's the workaround? Change the compression codec for
 shuffle output?

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Parquet error reading data that contains array of structs

2015-04-27 Thread Jianshi Huang
 Version of SPARK-4520 is not set.
 I assume it was fixed in 1.3.0

  Cheers
 Fix Version

 On Fri, Apr 24, 2015 at 11:00 AM, Yin Huai yh...@databricks.com wrote:

 The exception looks like the one mentioned in
 https://issues.apache.org/jira/browse/SPARK-4520. What is the version
 of Spark?

 On Fri, Apr 24, 2015 at 2:40 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

  My data looks like this:

  +---++--+
 | col_name  | data_type  | comment  |
 +---++--+
 | cust_id   | string |  |
 | part_num  | int|  |
 | ip_list   | arraystructip:string   |  |
 | vid_list  | arraystructvid:string  |  |
 | fso_list  | arraystructfso:string  |  |
 | src   | string |  |
 | date  | int|  |
 +---++--+
 And I did select *, it reports ParquetDecodingException.
 Is this type not supported in SparkSQL?
 Detailed error message here:

 Error: org.apache.spark.SparkException: Job aborted due to stage failure: 
 Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in 
 stage 27.0 (TID 510, lvshdc5dn0542.lvs.paypal.com): 
 parquet.io.ParquetDecodingException:
 Can not read value at 0 in block -1 in file 
 hdfs://xxx/part-m-0.gz.parquet
 at 
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at 
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at 
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
 at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at 
 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
 at 
 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
 at 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at 
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)
 Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:400)
 at java.util.ArrayList.get(ArrayList.java:413)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at 
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:290)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at 
 parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at 
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
 at 
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
 at 
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)


  --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com

Re: Parquet error reading data that contains array of structs

2015-04-26 Thread Jianshi Huang
Hi Huai,

I'm using Spark 1.3.1.

You're right. The dataset is not generated by Spark. It's generated by Pig
using Parquet 1.6.0rc7 jars.

Let me see if I can send a testing dataset to you...


Jianshi


On Sat, Apr 25, 2015 at 2:22 AM, Yin Huai yh...@databricks.com wrote:

 oh, I missed that. It is fixed in 1.3.0.

 Also, Jianshi, the dataset was not generated by Spark SQL, right?

 On Fri, Apr 24, 2015 at 11:09 AM, Ted Yu yuzhih...@gmail.com wrote:

 Yin:
 Fix Version of SPARK-4520 is not set.
 I assume it was fixed in 1.3.0

 Cheers
 Fix Version

 On Fri, Apr 24, 2015 at 11:00 AM, Yin Huai yh...@databricks.com wrote:

 The exception looks like the one mentioned in
 https://issues.apache.org/jira/browse/SPARK-4520. What is the version
 of Spark?

 On Fri, Apr 24, 2015 at 2:40 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 My data looks like this:

 +---++--+
 | col_name  | data_type  | comment  |
 +---++--+
 | cust_id   | string |  |
 | part_num  | int|  |
 | ip_list   | arraystructip:string   |  |
 | vid_list  | arraystructvid:string  |  |
 | fso_list  | arraystructfso:string  |  |
 | src   | string |  |
 | date  | int|  |
 +---++--+

 And I did select *, it reports ParquetDecodingException.

 Is this type not supported in SparkSQL?

 Detailed error message here:


 Error: org.apache.spark.SparkException: Job aborted due to stage failure: 
 Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in 
 stage 27.0 (TID 510, lvshdc5dn0542.lvs.paypal.com): 
 parquet.io.ParquetDecodingException:
 Can not read value at 0 in block -1 in file 
 hdfs://xxx/part-m-0.gz.parquet
 at 
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
 at 
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
 at 
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
 at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at 
 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
 at 
 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
 at 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at 
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)
 Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:400)
 at java.util.ArrayList.get(ArrayList.java:413)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at 
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:290)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at 
 parquet.filter2

Parquet error reading data that contains array of structs

2015-04-24 Thread Jianshi Huang
Hi,

My data looks like this:

+---++--+
| col_name  | data_type  | comment  |
+---++--+
| cust_id   | string |  |
| part_num  | int|  |
| ip_list   | arraystructip:string   |  |
| vid_list  | arraystructvid:string  |  |
| fso_list  | arraystructfso:string  |  |
| src   | string |  |
| date  | int|  |
+---++--+

And I did select *, it reports ParquetDecodingException.

Is this type not supported in SparkSQL?

Detailed error message here:


Error: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 27.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 27.0 (TID 510, lvshdc5dn0542.lvs.paypal.com):
parquet.io.ParquetDecodingException:
Can not read value at 0 in block -1 in file hdfs://xxx/part-m-0.gz.parquet
at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)
at 
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:400)
at java.util.ArrayList.get(ArrayList.java:413)
at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
at 
parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:290)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
at 
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
at 
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


How to write Hive's map(key, value, ...) in Spark SQL DSL

2015-04-22 Thread Jianshi Huang
Hi,

I want to write this in Spark SQL DSL:

select map('c1', c1, 'c2', c2) as m
from table

Is there a way?

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: How to do dispatching in Streaming?

2015-04-17 Thread Jianshi Huang
Thanks everyone for the reply.

Looks like foreachRDD + filtering is the way to go. I'll have 4 independent
Spark streaming applications so the overhead seems acceptable.

Jianshi


On Fri, Apr 17, 2015 at 5:17 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Good use of analogies J



 Yep friction (or entropy in general) exists in everything – but hey by
 adding and doing “more work” at the same time (aka more powerful rockets)
 some people have overcome the friction of the air and even got as far as
 the moon and beyond



 It is all about the bottom lime / the big picture – in some models,
 friction can be a huge factor in the equations in some other it is just
 part of the landscape



 *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
 *Sent:* Friday, April 17, 2015 10:12 AM

 *To:* Evo Eftimov
 *Cc:* Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie
 *Subject:* Re: How to do dispatching in Streaming?



 Evo,



 In Spark there's a fixed scheduling cost for each task, so more tasks mean
 an increased bottom line for the same amount of work being done. The number
 of tasks per batch interval should relate to the CPU resources available
 for the job following the same 'rule of thumbs' than for Spark, being 2-3
 times the #of cores.



 In that physical model presented before, I think we could consider this
 scheduling cost as a form of friction.



 -kr, Gerard.



 On Thu, Apr 16, 2015 at 11:47 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 Ooops – what does “more work” mean in a Parallel Programming paradigm and
 does it always translate in “inefficiency”



 Here are a few laws of physics in this space:



 1.   More Work if done AT THE SAME time AND fully utilizes the
 cluster resources is a GOOD thing

 2.   More Work which can not be done at the same time and has to be
 processed sequentially is a BAD thing



 So the key is whether it is about 1 or 2 and if it is about 1, whether it
 leads to e.g. Higher Throughput and Lower Latency or not



 Regards,

 Evo Eftimov



 *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
 *Sent:* Thursday, April 16, 2015 10:41 AM
 *To:* Evo Eftimov
 *Cc:* Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie


 *Subject:* Re: How to do dispatching in Streaming?



 From experience, I'd recommend using the  dstream.foreachRDD method and
 doing the filtering within that context. Extending the example of TD,
 something like this:



 dstream.foreachRDD { rdd =

rdd.cache()

messageType.foreach (msgTyp =

val selection = rdd.filter(msgTyp.match(_))

 selection.foreach { ... }

 }

rdd.unpersist()

 }



 I would discourage the use of:

 MessageType1DStream = MainDStream.filter(message type1)

 MessageType2DStream = MainDStream.filter(message type2)

 MessageType3DStream = MainDStream.filter(message type3)



 Because it will be a lot more work to process on the spark side.

 Each DSteam will schedule tasks for each partition, resulting in #dstream
 x #partitions x #stages tasks instead of the #partitions x #stages with the
 approach presented above.





 -kr, Gerard.



 On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 And yet another way is to demultiplex at one point which will yield
 separate DStreams for each message type which you can then process in
 independent DAG pipelines in the following way:



 MessageType1DStream = MainDStream.filter(message type1)

 MessageType2DStream = MainDStream.filter(message type2)

 MessageType3DStream = MainDStream.filter(message type3)



 Then proceed your processing independently with MessageType1DStream,
 MessageType2DStream and MessageType3DStream ie each of them is a starting
 point of a new DAG pipeline running in parallel



 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Thursday, April 16, 2015 12:52 AM
 *To:* Jianshi Huang
 *Cc:* user; Shao, Saisai; Huang Jie
 *Subject:* Re: How to do dispatching in Streaming?



 It may be worthwhile to do architect the computation in a different way.



 dstream.foreachRDD { rdd =

rdd.foreach { record =

   // do different things for each record based on filters

}

 }



 TD



 On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,



 I have a Kafka topic that contains dozens of different types of messages.
 And for each one I'll need to create a DStream for it.



 Currently I have to filter the Kafka stream over and over, which is very
 inefficient.



 So what's the best way to do dispatching in Spark Streaming? (one DStream
 - multiple DStreams)




 Thanks,

 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/










-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


How to do dispatching in Streaming?

2015-04-12 Thread Jianshi Huang
Hi,

I have a Kafka topic that contains dozens of different types of messages.
And for each one I'll need to create a DStream for it.

Currently I have to filter the Kafka stream over and over, which is very
inefficient.

So what's the best way to do dispatching in Spark Streaming? (one DStream
- multiple DStreams)


Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Add partition support in saveAsParquet

2015-03-27 Thread Jianshi Huang
Hi,

Anyone has similar request?

https://issues.apache.org/jira/browse/SPARK-6561

When we save a DataFrame into Parquet files, we also want to have it
partitioned.

The proposed API looks like this:

def saveAsParquet(path: String, partitionColumns: Seq[String])



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Handling fatal errors of executors and decommission datanodes

2015-03-16 Thread Jianshi Huang
I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353


On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 We're facing No space left on device errors lately from time to time.
 The job will fail after retries. Obvious in such case, retry won't be
 helpful.

 Sure it's the problem in the datanodes but I'm wondering if Spark Driver
 can handle it and decommission the problematic datanode before retrying it.
 And maybe dynamically allocate another datanode if dynamic allocation is
 enabled.

 I think there needs to be a class of fatal errors that can't be recovered
 with retries. And it's best Spark can handle it nicely.

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Handling fatal errors of executors and decommission datanodes

2015-03-16 Thread Jianshi Huang
Thanks Shixiong!

Very strange that our tasks were retried on the same executor again and
again. I'll check spark.scheduler.executorTaskBlacklistTime.

Jianshi

On Mon, Mar 16, 2015 at 6:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 There are 2 cases for No space left on device:

 1. Some tasks which use large temp space cannot run in any node.
 2. The free space of datanodes is not balance. Some tasks which use large
 temp space can not run in several nodes, but they can run in other nodes
 successfully.

 Because most of our cases are the second one, we set
 spark.scheduler.executorTaskBlacklistTime to 3 to solve such No
 space left on device errors. So if a task runs unsuccessfully in some
 executor, it won't be scheduled to the same executor in 30 seconds.


 Best Regards,
 Shixiong Zhu

 2015-03-16 17:40 GMT+08:00 Jianshi Huang jianshi.hu...@gmail.com:

 I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353


 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 We're facing No space left on device errors lately from time to time.
 The job will fail after retries. Obvious in such case, retry won't be
 helpful.

 Sure it's the problem in the datanodes but I'm wondering if Spark Driver
 can handle it and decommission the problematic datanode before retrying it.
 And maybe dynamically allocate another datanode if dynamic allocation is
 enabled.

 I think there needs to be a class of fatal errors that can't be
 recovered with retries. And it's best Spark can handle it nicely.

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Handling fatal errors of executors and decommission datanodes

2015-03-16 Thread Jianshi Huang
Oh, by default it's set to 0L.

I'll try setting it to 3 immediately. Thanks for the help!

Jianshi

On Mon, Mar 16, 2015 at 11:32 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Thanks Shixiong!

 Very strange that our tasks were retried on the same executor again and
 again. I'll check spark.scheduler.executorTaskBlacklistTime.

 Jianshi

 On Mon, Mar 16, 2015 at 6:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 There are 2 cases for No space left on device:

 1. Some tasks which use large temp space cannot run in any node.
 2. The free space of datanodes is not balance. Some tasks which use large
 temp space can not run in several nodes, but they can run in other nodes
 successfully.

 Because most of our cases are the second one, we set
 spark.scheduler.executorTaskBlacklistTime to 3 to solve such No
 space left on device errors. So if a task runs unsuccessfully in some
 executor, it won't be scheduled to the same executor in 30 seconds.


 Best Regards,
 Shixiong Zhu

 2015-03-16 17:40 GMT+08:00 Jianshi Huang jianshi.hu...@gmail.com:

 I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353


 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 We're facing No space left on device errors lately from time to time.
 The job will fail after retries. Obvious in such case, retry won't be
 helpful.

 Sure it's the problem in the datanodes but I'm wondering if Spark
 Driver can handle it and decommission the problematic datanode before
 retrying it. And maybe dynamically allocate another datanode if dynamic
 allocation is enabled.

 I think there needs to be a class of fatal errors that can't be
 recovered with retries. And it's best Spark can handle it nicely.

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-13 Thread Jianshi Huang
Forget about my last message. I was confused. Spark 1.2.1 + Scala 2.10.4
started by SBT console command also failed with this error. However running
from a standard spark shell works.

Jianshi

On Fri, Mar 13, 2015 at 2:46 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hmm... look like the console command still starts a Spark 1.3.0 with Scala
 2.11.6 even I changed them in build.sbt.

 So the test with 1.2.1 is not valid.

 Jianshi

 On Fri, Mar 13, 2015 at 2:34 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 I've confirmed it only failed in console started by SBT.

 I'm using sbt-spark-package plugin, and the initialCommands look like
 this (I added implicit sqlContext to it):

  show console::initialCommands
 [info]  println(Welcome to\n +
 [info]     __\n +
 [info]  / __/__  ___ _/ /__\n +
 [info] _\\ \\/ _ \\/ _ `/ __/  '_/\n +
 [info]/___/ .__/\\_,_/_/ /_/\\_\\   version \1.3.0-rc2\\n +
 [info]   /_/\n +
 [info] Using Scala \2.11.6\\n)
 [info]
 [info] import org.apache.spark.SparkContext._
 [info]
 [info] val sc = {
 [info]   val conf = new  org.apache.spark.SparkConf()
 [info] .setMaster(local)
 [info] .setAppName(Sbt console + Spark!)
 [info]   new org.apache.spark.SparkContext(conf)
 [info] }
 [info] println(Created spark context as sc.)
 [info]
 [info] def time[T](f: = T): T = {
 [info]   import System.{currentTimeMillis = now}
 [info]   val start = now
 [info]   try { f } finally { println(Elapsed:  + (now - start)/1000.0 +
  s) }
 [info] }
 [info]
 [info] @transient val sqlc = new org.apache.spark.sql.SQLContext(sc)
 [info] implicit def sqlContext = sqlc
 [info] import sqlc._


 Jianshi


 On Fri, Mar 13, 2015 at 3:10 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 BTW, I was running tests from SBT when I get the errors. One test turn a
 Seq of case class to DataFrame.

 I also tried run similar code in the console, but failed with same error.

 I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4

 Any idea?

 Jianshi

 On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Same issue here. But the classloader in my exception is somehow
 different.

 scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
 java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader
 with classpath


 Jianshi

 On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust 
 mich...@databricks.com wrote:

 I think its possible that the problem is that the scala compiler is
 not being loaded by the primordial classloader (but instead by some child
 classloader) and thus the scala reflection mirror is failing to initialize
 when it can't find it. Unfortunately, the only solution that I know of is
 to load all required jars when the JVM starts.

 On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Also, can scala version play any role here?
 I am using scala 2.11.5 but all spark packages have dependency to
 scala 2.11.2
 Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
  wrote:

 Hi,
 I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
 But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with 
 primordial
 classloader with boot classpath [.] not found


 Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

 Please let me know how can I resolve this problem.

 Thanks
 Ashish






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-13 Thread Jianshi Huang
Hmm... look like the console command still starts a Spark 1.3.0 with Scala
2.11.6 even I changed them in build.sbt.

So the test with 1.2.1 is not valid.

Jianshi

On Fri, Mar 13, 2015 at 2:34 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I've confirmed it only failed in console started by SBT.

 I'm using sbt-spark-package plugin, and the initialCommands look like this
 (I added implicit sqlContext to it):

  show console::initialCommands
 [info]  println(Welcome to\n +
 [info]     __\n +
 [info]  / __/__  ___ _/ /__\n +
 [info] _\\ \\/ _ \\/ _ `/ __/  '_/\n +
 [info]/___/ .__/\\_,_/_/ /_/\\_\\   version \1.3.0-rc2\\n +
 [info]   /_/\n +
 [info] Using Scala \2.11.6\\n)
 [info]
 [info] import org.apache.spark.SparkContext._
 [info]
 [info] val sc = {
 [info]   val conf = new  org.apache.spark.SparkConf()
 [info] .setMaster(local)
 [info] .setAppName(Sbt console + Spark!)
 [info]   new org.apache.spark.SparkContext(conf)
 [info] }
 [info] println(Created spark context as sc.)
 [info]
 [info] def time[T](f: = T): T = {
 [info]   import System.{currentTimeMillis = now}
 [info]   val start = now
 [info]   try { f } finally { println(Elapsed:  + (now - start)/1000.0 +
  s) }
 [info] }
 [info]
 [info] @transient val sqlc = new org.apache.spark.sql.SQLContext(sc)
 [info] implicit def sqlContext = sqlc
 [info] import sqlc._


 Jianshi


 On Fri, Mar 13, 2015 at 3:10 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 BTW, I was running tests from SBT when I get the errors. One test turn a
 Seq of case class to DataFrame.

 I also tried run similar code in the console, but failed with same error.

 I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4

 Any idea?

 Jianshi

 On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Same issue here. But the classloader in my exception is somehow
 different.

 scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
 java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader
 with classpath


 Jianshi

 On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com
  wrote:

 I think its possible that the problem is that the scala compiler is not
 being loaded by the primordial classloader (but instead by some child
 classloader) and thus the scala reflection mirror is failing to initialize
 when it can't find it. Unfortunately, the only solution that I know of is
 to load all required jars when the JVM starts.

 On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Also, can scala version play any role here?
 I am using scala 2.11.5 but all spark packages have dependency to
 scala 2.11.2
 Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Hi,
 I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
 But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with 
 primordial
 classloader with boot classpath [.] not found


 Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

 Please let me know how can I resolve this problem.

 Thanks
 Ashish






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-13 Thread Jianshi Huang
I've confirmed it only failed in console started by SBT.

I'm using sbt-spark-package plugin, and the initialCommands look like this
(I added implicit sqlContext to it):

 show console::initialCommands
[info]  println(Welcome to\n +
[info]     __\n +
[info]  / __/__  ___ _/ /__\n +
[info] _\\ \\/ _ \\/ _ `/ __/  '_/\n +
[info]/___/ .__/\\_,_/_/ /_/\\_\\   version \1.3.0-rc2\\n +
[info]   /_/\n +
[info] Using Scala \2.11.6\\n)
[info]
[info] import org.apache.spark.SparkContext._
[info]
[info] val sc = {
[info]   val conf = new  org.apache.spark.SparkConf()
[info] .setMaster(local)
[info] .setAppName(Sbt console + Spark!)
[info]   new org.apache.spark.SparkContext(conf)
[info] }
[info] println(Created spark context as sc.)
[info]
[info] def time[T](f: = T): T = {
[info]   import System.{currentTimeMillis = now}
[info]   val start = now
[info]   try { f } finally { println(Elapsed:  + (now - start)/1000.0 + 
s) }
[info] }
[info]
[info] @transient val sqlc = new org.apache.spark.sql.SQLContext(sc)
[info] implicit def sqlContext = sqlc
[info] import sqlc._


Jianshi


On Fri, Mar 13, 2015 at 3:10 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 BTW, I was running tests from SBT when I get the errors. One test turn a
 Seq of case class to DataFrame.

 I also tried run similar code in the console, but failed with same error.

 I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4

 Any idea?

 Jianshi

 On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Same issue here. But the classloader in my exception is somehow different.

 scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
 java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader
 with classpath


 Jianshi

 On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com
 wrote:

 I think its possible that the problem is that the scala compiler is not
 being loaded by the primordial classloader (but instead by some child
 classloader) and thus the scala reflection mirror is failing to initialize
 when it can't find it. Unfortunately, the only solution that I know of is
 to load all required jars when the JVM starts.

 On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Also, can scala version play any role here?
 I am using scala 2.11.5 but all spark packages have dependency to scala
 2.11.2
 Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Hi,
 I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
 But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with 
 primordial
 classloader with boot classpath [.] not found


 Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

 Please let me know how can I resolve this problem.

 Thanks
 Ashish






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-13 Thread Jianshi Huang
Liancheng also found out that the Spark jars are not included in the
classpath of URLClassLoader.

Hmm... we're very close to the truth now.

Jianshi

On Fri, Mar 13, 2015 at 6:03 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I'm almost certain the problem is the ClassLoader.

 So adding

   fork := true

 solves problems for test and run.

 The problem is how can I fork a JVM for sbt console? fork in console :=
 true seems not working...

 Jianshi


 On Fri, Mar 13, 2015 at 4:35 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 I guess it's a ClassLoader issue. But I have no idea how to debug it. Any
 hints?

 Jianshi

 On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org wrote:

  i have the same issue running spark sql code from eclipse workspace. If
 you run your code from the command line (with a packaged jar) or from
 Intellij, I bet it should work.

 IMHO This is some how related to eclipse env, but would love to know how
 to fix it (whether via eclipse conf, or via a patch in spark).



 On 03/01/2015 02:32 AM, Michael Armbrust wrote:

 I think its possible that the problem is that the scala compiler is not
 being loaded by the primordial classloader (but instead by some child
 classloader) and thus the scala reflection mirror is failing to initialize
 when it can't find it. Unfortunately, the only solution that I know of is
 to load all required jars when the JVM starts.

 On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

  Also, can scala version play any role here?
  I am using scala 2.11.5 but all spark packages have dependency to
 scala 2.11.2
  Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

Hi,
  I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
  But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with 
 primordial
 classloader with boot classpath [.] not found


  Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

  Please let me know how can I resolve this problem.

  Thanks
  Ashish






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-13 Thread Jianshi Huang
I guess it's a ClassLoader issue. But I have no idea how to debug it. Any
hints?

Jianshi

On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org wrote:

  i have the same issue running spark sql code from eclipse workspace. If
 you run your code from the command line (with a packaged jar) or from
 Intellij, I bet it should work.

 IMHO This is some how related to eclipse env, but would love to know how
 to fix it (whether via eclipse conf, or via a patch in spark).



 On 03/01/2015 02:32 AM, Michael Armbrust wrote:

 I think its possible that the problem is that the scala compiler is not
 being loaded by the primordial classloader (but instead by some child
 classloader) and thus the scala reflection mirror is failing to initialize
 when it can't find it. Unfortunately, the only solution that I know of is
 to load all required jars when the JVM starts.

 On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

  Also, can scala version play any role here?
  I am using scala 2.11.5 but all spark packages have dependency to scala
 2.11.2
  Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

Hi,
  I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
  But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
 classloader with boot classpath [.] not found


  Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

  Please let me know how can I resolve this problem.

  Thanks
  Ashish






-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-13 Thread Jianshi Huang
I'm almost certain the problem is the ClassLoader.

So adding

  fork := true

solves problems for test and run.

The problem is how can I fork a JVM for sbt console? fork in console :=
true seems not working...

Jianshi


On Fri, Mar 13, 2015 at 4:35 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I guess it's a ClassLoader issue. But I have no idea how to debug it. Any
 hints?

 Jianshi

 On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org wrote:

  i have the same issue running spark sql code from eclipse workspace. If
 you run your code from the command line (with a packaged jar) or from
 Intellij, I bet it should work.

 IMHO This is some how related to eclipse env, but would love to know how
 to fix it (whether via eclipse conf, or via a patch in spark).



 On 03/01/2015 02:32 AM, Michael Armbrust wrote:

 I think its possible that the problem is that the scala compiler is not
 being loaded by the primordial classloader (but instead by some child
 classloader) and thus the scala reflection mirror is failing to initialize
 when it can't find it. Unfortunately, the only solution that I know of is
 to load all required jars when the JVM starts.

 On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

  Also, can scala version play any role here?
  I am using scala 2.11.5 but all spark packages have dependency to scala
 2.11.2
  Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

Hi,
  I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
  But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
 classloader with boot classpath [.] not found


  Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

  Please let me know how can I resolve this problem.

  Thanks
  Ashish






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-12 Thread Jianshi Huang
BTW, I was running tests from SBT when I get the errors. One test turn a
Seq of case class to DataFrame.

I also tried run similar code in the console, but failed with same error.

I tested both Spark 1.3.0-rc2 and 1.2.1 with Scala 2.11.6 and 2.10.4

Any idea?

Jianshi

On Fri, Mar 13, 2015 at 2:23 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Same issue here. But the classloader in my exception is somehow different.

 scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
 java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader
 with classpath


 Jianshi

 On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com
 wrote:

 I think its possible that the problem is that the scala compiler is not
 being loaded by the primordial classloader (but instead by some child
 classloader) and thus the scala reflection mirror is failing to initialize
 when it can't find it. Unfortunately, the only solution that I know of is
 to load all required jars when the JVM starts.

 On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Also, can scala version play any role here?
 I am using scala 2.11.5 but all spark packages have dependency to scala
 2.11.2
 Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Hi,
 I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
 But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
 classloader with boot classpath [.] not found


 Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

 Please let me know how can I resolve this problem.

 Thanks
 Ashish






 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-12 Thread Jianshi Huang
Same issue here. But the classloader in my exception is somehow different.

scala.ScalaReflectionException: class
org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
java.net.URLClassLoader@53298398 of type class java.net.URLClassLoader with
classpath


Jianshi

On Sun, Mar 1, 2015 at 9:32 AM, Michael Armbrust mich...@databricks.com
wrote:

 I think its possible that the problem is that the scala compiler is not
 being loaded by the primordial classloader (but instead by some child
 classloader) and thus the scala reflection mirror is failing to initialize
 when it can't find it. Unfortunately, the only solution that I know of is
 to load all required jars when the JVM starts.

 On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Also, can scala version play any role here?
 I am using scala 2.11.5 but all spark packages have dependency to scala
 2.11.2
 Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Hi,
 I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
 But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
 classloader with boot classpath [.] not found


 Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

 Please let me know how can I resolve this problem.

 Thanks
 Ashish






-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: How to set per-user spark.local.dir?

2015-03-11 Thread Jianshi Huang
Unfortunately /tmp mount is really small in our environment. I need to
provide a per-user setting as the default value.

I hacked bin/spark-class for the similar effect. And spark-defaults.conf
can override it. :)

Jianshi

On Wed, Mar 11, 2015 at 3:28 PM, Patrick Wendell pwend...@gmail.com wrote:

 We don't support expressions or wildcards in that configuration. For
 each application, the local directories need to be constant. If you
 have users submitting different Spark applications, those can each set
 spark.local.dirs.

 - Patrick

 On Wed, Mar 11, 2015 at 12:14 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:
  Hi,
 
  I need to set per-user spark.local.dir, how can I do that?
 
  I tried both
 
/x/home/${user.name}/spark/tmp
  and
/x/home/${USER}/spark/tmp
 
  And neither worked. Looks like it has to be a constant setting in
  spark-defaults.conf. Right?
 
  Any ideas how to do that?
 
  Thanks,
  --
  Jianshi Huang
 
  LinkedIn: jianshi
  Twitter: @jshuang
  Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: How to set per-user spark.local.dir?

2015-03-11 Thread Jianshi Huang
Thanks Sean. I'll ask our Hadoop admin.

Actually I didn't find hadoop.tmp.dir in the Hadoop settings...using user
home is suggested by other users.

Jianshi

On Wed, Mar 11, 2015 at 3:51 PM, Sean Owen so...@cloudera.com wrote:

 You shouldn't use /tmp, but it doesn't mean you should use user home
 directories either. Typically, like in YARN, you would a number of
 directories (on different disks) mounted and configured for local
 storage for jobs.

 On Wed, Mar 11, 2015 at 7:42 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:
  Unfortunately /tmp mount is really small in our environment. I need to
  provide a per-user setting as the default value.
 
  I hacked bin/spark-class for the similar effect. And spark-defaults.conf
 can
  override it. :)
 
  Jianshi
 
  On Wed, Mar 11, 2015 at 3:28 PM, Patrick Wendell pwend...@gmail.com
 wrote:
 
  We don't support expressions or wildcards in that configuration. For
  each application, the local directories need to be constant. If you
  have users submitting different Spark applications, those can each set
  spark.local.dirs.
 
  - Patrick
 
  On Wed, Mar 11, 2015 at 12:14 AM, Jianshi Huang 
 jianshi.hu...@gmail.com
  wrote:
   Hi,
  
   I need to set per-user spark.local.dir, how can I do that?
  
   I tried both
  
 /x/home/${user.name}/spark/tmp
   and
 /x/home/${USER}/spark/tmp
  
   And neither worked. Looks like it has to be a constant setting in
   spark-defaults.conf. Right?
  
   Any ideas how to do that?
  
   Thanks,
   --
   Jianshi Huang
  
   LinkedIn: jianshi
   Twitter: @jshuang
   Github  Blog: http://huangjs.github.com/
 
 
 
 
  --
  Jianshi Huang
 
  LinkedIn: jianshi
  Twitter: @jshuang
  Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


How to set per-user spark.local.dir?

2015-03-11 Thread Jianshi Huang
Hi,

I need to set per-user spark.local.dir, how can I do that?

I tried both

  /x/home/${user.name}/spark/tmp
and
  /x/home/${USER}/spark/tmp

And neither worked. Looks like it has to be a constant setting in
spark-defaults.conf. Right?

Any ideas how to do that?

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Having lots of FetchFailedException in join

2015-03-05 Thread Jianshi Huang
Thanks. I was about to submit a ticket for this :)

Also there's a ticket for sort-merge based groupbykey
https://issues.apache.org/jira/browse/SPARK-3461

BTW, any idea why run with netty didn't output OOM error messages? It's
very confusing in troubleshooting.


Jianshi

On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai saisai.s...@intel.com wrote:

  I think there’s a lot of JIRA trying to solve this problem (
 https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge
 join is a good choice.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:55 PM
 *To:* Shao, Saisai
 *Cc:* Cheng, Hao; user

 *Subject:* Re: Having lots of FetchFailedException in join



 There're some skew.



 64

 6164

 0

 SUCCESS

 PROCESS_LOCAL

 200 / 

 2015/03/04 23:45:47

 1.1 min

 6 s

 198.6 MB

 21.1 GB

 240.8 MB

 59

 6159

 0

 SUCCESS

 PROCESS_LOCAL

 30 / 

 2015/03/04 23:45:47

 44 s

 5 s

 200.7 MB

 4.8 GB

 154.0 MB

 But I expect this kind of skewness to be quite common.



 Jianshi





 On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I see. I'm using core's join. The data might have some skewness
 (checking).



 I understand shuffle can spill data to disk but when consuming it, say in
 cogroup or groupByKey, it still needs to read the whole group elements,
 right? I guess OOM happened there when reading very large groups.



 Jianshi



 On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you could do is to monitor through web UI to see if there’s
 any skew or other symptoms in shuffle write and read. For GC you could use
 the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai


 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
One really interesting is that when I'm using the
netty-based spark.shuffle.blockTransferService, there's no OOM error
messages (java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.


 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)
 java.lang.OutOfMemoryError: Java heap space
 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)

 Is join/cogroup still memory bound?


 Jianshi



 On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hmm... ok, previous errors are still block fetch errors.

 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks
 java.io.IOException: Failed to connect to host-/:55597
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
There're some skew.

6461640SUCCESSPROCESS_LOCAL200 / 2015/03/04 23:45:471.1 min6 s198.6 MB21.1
GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / 2015/03/04 23:45:4744 s5 s200.7
MB4.8 GB154.0 MB
But I expect this kind of skewness to be quite common.

Jianshi


On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I see. I'm using core's join. The data might have some skewness
 (checking).

 I understand shuffle can spill data to disk but when consuming it, say in
 cogroup or groupByKey, it still needs to read the whole group elements,
 right? I guess OOM happened there when reading very large groups.

 Jianshi

 On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you could do is to monitor through web UI to see if
 there’s any skew or other symptoms in shuffle write and read. For GC you
 could use the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai

 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
Hi Saisai,

What's your suggested settings on monitoring shuffle? I've
enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using
repartitionAndSortWithinPartitions) want to make groupByKey using external
storage. It's still open status. Does that mean now
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
the group as a whole during consuming?


How can I deal with the key skewness in joins? Is there a skew-join
implementation?



Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

 at org.apache.spark.scheduler.Task.run(Task.scala:56)



 Is join/cogroup still memory bound?





 Jianshi







 On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  Hmm... ok, previous errors are still block fetch errors.



 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks

 java.io.IOException: Failed to connect to host-/:55597

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in
cogroup or groupByKey, it still needs to read the whole group elements,
right? I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote:

  I think what you could do is to monitor through web UI to see if there’s
 any skew or other symptoms in shuffle write and read. For GC you could use
 the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai

 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID
6207)
java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hmm... ok, previous errors are still block fetch errors.

 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks
 java.io.IOException: Failed to connect to host-/:55597
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply

[no subject]

2015-03-03 Thread Jianshi Huang
Hi,

I got this error message:

15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block
fetches
java.lang.RuntimeException: java.io.FileNotFoundException:
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
(No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


And then for the same index file and executor, I got the following errors
multiple times

15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s)
from host-:39534
java.lang.RuntimeException: java.io.FileNotFoundException:
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
(No such file or directory)

15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
shuffle_0_13_1228, and will not retry (0 retries)
java.lang.RuntimeException: java.io.FileNotFoundException:
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
(No such file or directory)

...
Caused by: java.net.ConnectException: Connection refused: host-


What's the problem?

BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
bug fixes related to shuffle block fetching or index files after that?


Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Having lots of FetchFailedException in join

2015-03-03 Thread Jianshi Huang
Sorry that I forgot the subject.

And in the driver, I got many FetchFailedException. The error messages are

15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0,
mapId=24, reduceId=1220, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to
/:43070
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)


Jianshi

On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I got this error message:

 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block
 fetches
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


 And then for the same index file and executor, I got the following errors
 multiple times

 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
 block(s) from host-:39534
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
 shuffle_0_13_1228, and will not retry (0 retries)
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 ...
 Caused by: java.net.ConnectException: Connection refused: host-


 What's the problem?

 BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
 bug fixes related to shuffle block fetching or index files after that?


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Having lots of FetchFailedException in join

2015-03-03 Thread Jianshi Huang
Hmm... ok, previous errors are still block fetch errors.

15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
fetch of 11 outstanding blocks
java.io.IOException: Failed to connect to host-/:55597
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.net.ConnectException: Connection refused:
lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)

And I checked executor on container host-, everything is good.

Jianshi


On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson ilike...@gmail.com wrote:

 Drat! That doesn't help. Could you scan from the top to see if there were
 any fatal errors preceding these? Sometimes a OOM will cause this type of
 issue further down.

 On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 The failed executor has the following error messages. Any hints?

 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
 RpcHandler#receive() on RPC id 5711039715419258699

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Jianshi Huang
(ByteToMessageDecoder.java:163)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:724)


Jianshi

On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson ilike...@gmail.com wrote:

 Failed to connect implies that the executor at that host died, please
 check its logs as well.

 On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Sorry that I forgot the subject.

 And in the driver, I got many FetchFailedException. The error messages are

 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0,
 mapId=24, reduceId=1220, message=
 org.apache.spark.shuffle.FetchFailedException: Failed to connect to
 /:43070
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)


 Jianshi

 On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I got this error message:

 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting
 block fetches
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at
 scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


 And then for the same index file and executor, I got the following
 errors multiple times

 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
 block(s) from host-:39534
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
 shuffle_0_13_1228, and will not retry (0 retries)
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 ...
 Caused by: java.net.ConnectException: Connection refused: host-


 What's the problem?

 BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
 bug fixes related to shuffle block fetching or index files after that?


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Dynamic partition pattern support

2015-02-15 Thread Jianshi Huang
Hi,

HCatalog allows you to specify the pattern of paths for partitions, which
will be used by dynamic partition loading.


https://cwiki.apache.org/confluence/display/Hive/HCatalog+DynamicPartitions#HCatalogDynamicPartitions-ExternalTables

Can we have similar feature in SparkSQL?

Jira is here: https://issues.apache.org/jira/browse/SPARK-5828

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Which version to use for shuffle service if I'm going to run multiple versions of Spark

2015-02-13 Thread Jianshi Huang
Get it. Thanks Reynold and Andrew!

Jianshi

On Thu, Feb 12, 2015 at 12:25 AM, Andrew Or and...@databricks.com wrote:

 Hi Jianshi,

 For YARN, there may be an issue with how a recently patch changes the
 accessibility of the shuffle files by the external shuffle service:
 https://issues.apache.org/jira/browse/SPARK-5655. It is likely that you
 will hit this with 1.2.1, actually. For this reason I would have to
 recommend that you use 1.2.2 when it is released, but for now you should
 use 1.2.0 for this specific use case.

 -Andrew

 2015-02-10 23:38 GMT-08:00 Reynold Xin r...@databricks.com:

 I think we made the binary protocol compatible across all versions, so you
 should be fine with using any one of them. 1.2.1 is probably the best since
 it is the most recent stable release.

 On Tue, Feb 10, 2015 at 8:43 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I need to use branch-1.2 and sometimes master builds of Spark for my
 project. However the officially supported Spark version by our Hadoop admin
 is only 1.2.0.

 So, my question is which version/build of spark-yarn-shuffle.jar should
 I use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0)

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Which version to use for shuffle service if I'm going to run multiple versions of Spark

2015-02-10 Thread Jianshi Huang
Hi,

I need to use branch-1.2 and sometimes master builds of Spark for my
project. However the officially supported Spark version by our Hadoop admin
is only 1.2.0.

So, my question is which version/build of spark-yarn-shuffle.jar should I
use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0)

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Pig loader in Spark

2015-02-03 Thread Jianshi Huang
Hi,

Anyone has implemented the default Pig Loader in Spark? (loading delimited
text files with .pig_schema)

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Hive UDAF percentile_approx says This UDAF does not support the deprecated getEvaluator() method.

2015-01-13 Thread Jianshi Huang
Hi,

The following SQL query

select percentile_approx(variables.var1, 0.95) p95
from model

will throw

ERROR SparkSqlInterpreter: Error
org.apache.hadoop.hive.ql.parse.SemanticException: This UDAF does not
support the deprecated getEvaluator() method.
at
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver.getEvaluator(AbstractGenericUDAFResolver.java:53)
at
org.apache.spark.sql.hive.HiveGenericUdaf.objectInspector$lzycompute(hiveUdfs.scala:196)
at
org.apache.spark.sql.hive.HiveGenericUdaf.objectInspector(hiveUdfs.scala:195)
at
org.apache.spark.sql.hive.HiveGenericUdaf.dataType(hiveUdfs.scala:203)
at
org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:105)
at
org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:143)
at
org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:143)

I'm using latest branch-1.2

I found in PR that percentile and percentile_approx are supported. A bug?

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Hive UDAF percentile_approx says This UDAF does not support the deprecated getEvaluator() method.

2015-01-13 Thread Jianshi Huang
Ah, thx Ted and Yin!

I'll build a new version. :)

Jianshi

On Wed, Jan 14, 2015 at 7:24 AM, Yin Huai yh...@databricks.com wrote:

 Yeah, it's a bug. It has been fixed by
 https://issues.apache.org/jira/browse/SPARK-3891 in master.

 On Tue, Jan 13, 2015 at 2:41 PM, Ted Yu yuzhih...@gmail.com wrote:

 Looking at the source code for AbstractGenericUDAFResolver, the following
 (non-deprecated) method should be called:

   public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)

 It is called by hiveUdfs.scala (master branch):

 val parameterInfo = new
 SimpleGenericUDAFParameterInfo(inspectors.toArray, false, false)
 resolver.getEvaluator(parameterInfo)

 FYI

 On Tue, Jan 13, 2015 at 1:51 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 The following SQL query

 select percentile_approx(variables.var1, 0.95) p95
 from model

 will throw

 ERROR SparkSqlInterpreter: Error
 org.apache.hadoop.hive.ql.parse.SemanticException: This UDAF does not
 support the deprecated getEvaluator() method.
 at
 org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver.getEvaluator(AbstractGenericUDAFResolver.java:53)
 at
 org.apache.spark.sql.hive.HiveGenericUdaf.objectInspector$lzycompute(hiveUdfs.scala:196)
 at
 org.apache.spark.sql.hive.HiveGenericUdaf.objectInspector(hiveUdfs.scala:195)
 at
 org.apache.spark.sql.hive.HiveGenericUdaf.dataType(hiveUdfs.scala:203)
 at
 org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:105)
 at
 org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:143)
 at
 org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$6.apply(basicOperators.scala:143)

 I'm using latest branch-1.2

 I found in PR that percentile and percentile_approx are supported. A bug?

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-23 Thread Jianshi Huang
FYI,

Latest hive 0.14/parquet will have column renaming support.

Jianshi

On Wed, Dec 10, 2014 at 3:37 AM, Michael Armbrust mich...@databricks.com
wrote:

 You might also try out the recently added support for views.

 On Mon, Dec 8, 2014 at 9:31 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Ah... I see. Thanks for pointing it out.

 Then it means we cannot mount external table using customized column
 names. hmm...

 Then the only option left is to use a subquery to add a bunch of column
 alias. I'll try it later.

 Thanks,
 Jianshi

 On Tue, Dec 9, 2014 at 3:34 AM, Michael Armbrust mich...@databricks.com
 wrote:

 This is by hive's design.  From the Hive documentation:

 The column change command will only modify Hive's metadata, and will not
 modify data. Users should make sure the actual data layout of the
 table/partition conforms with the metadata definition.



 On Sat, Dec 6, 2014 at 8:28 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Ok, found another possible bug in Hive.

 My current solution is to use ALTER TABLE CHANGE to rename the column
 names.

 The problem is after renaming the column names, the value of the
 columns became all NULL.

 Before renaming:
 scala sql(select `sorted::cre_ts` from pmt limit 1).collect
 res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54])

 Execute renaming:
 scala sql(alter table pmt change `sorted::cre_ts` cre_ts string)
 res13: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[972] at RDD at SchemaRDD.scala:108
 == Query Plan ==
 Native command: executed by Hive

 After renaming:
 scala sql(select cre_ts from pmt limit 1).collect
 res16: Array[org.apache.spark.sql.Row] = Array([null])

 I created a JIRA for it:

   https://issues.apache.org/jira/browse/SPARK-4781


 Jianshi

 On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hmm... another issue I found doing this approach is that ANALYZE TABLE
 ... COMPUTE STATISTICS will fail to attach the metadata to the table, and
 later broadcast join and such will fail...

 Any idea how to fix this issue?

 Jianshi

 On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang jianshi.hu...@gmail.com
  wrote:

 Very interesting, the line doing drop table will throws an exception.
 After removing it all works.

 Jianshi

 On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 Here's the solution I got after talking with Liancheng:

 1) using backquote `..` to wrap up all illegal characters

 val rdd = parquetFile(file)
 val schema = rdd.schema.fields.map(f = s`${f.name}`
 ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n)

 val ddl_13 = s
   |CREATE EXTERNAL TABLE $name (
   |  $schema
   |)
   |STORED AS PARQUET
   |LOCATION '$file'
   .stripMargin

 sql(ddl_13)

 2) create a new Schema and do applySchema to generate a new
 SchemaRDD, had to drop and register table

 val t = table(name)
 val newSchema = StructType(t.schema.fields.map(s = s.copy(name
 = s.name.replaceAll(.*?::, 
 sql(sdrop table $name)
 applySchema(t, newSchema).registerTempTable(name)

 I'm testing it for now.

 Thanks for the help!


 Jianshi

 On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 Hi,

 I had to use Pig for some preprocessing and to generate Parquet
 files for Spark to consume.

 However, due to Pig's limitation, the generated schema contains
 Pig's identifier

 e.g.
 sorted::id, sorted::cre_ts, ...

 I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.

   create external table pmt (
 sorted::id bigint
   )
   stored as parquet
   location '...'

 Obviously it didn't work, I also tried removing the identifier
 sorted::, but the resulting rows contain only nulls.

 Any idea how to create a table in HiveContext from these Parquet
 files?

 Thanks,
 Jianshi
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-08 Thread Jianshi Huang
Ah... I see. Thanks for pointing it out.

Then it means we cannot mount external table using customized column names.
hmm...

Then the only option left is to use a subquery to add a bunch of column
alias. I'll try it later.

Thanks,
Jianshi

On Tue, Dec 9, 2014 at 3:34 AM, Michael Armbrust mich...@databricks.com
wrote:

 This is by hive's design.  From the Hive documentation:

 The column change command will only modify Hive's metadata, and will not
 modify data. Users should make sure the actual data layout of the
 table/partition conforms with the metadata definition.



 On Sat, Dec 6, 2014 at 8:28 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Ok, found another possible bug in Hive.

 My current solution is to use ALTER TABLE CHANGE to rename the column
 names.

 The problem is after renaming the column names, the value of the columns
 became all NULL.

 Before renaming:
 scala sql(select `sorted::cre_ts` from pmt limit 1).collect
 res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54])

 Execute renaming:
 scala sql(alter table pmt change `sorted::cre_ts` cre_ts string)
 res13: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[972] at RDD at SchemaRDD.scala:108
 == Query Plan ==
 Native command: executed by Hive

 After renaming:
 scala sql(select cre_ts from pmt limit 1).collect
 res16: Array[org.apache.spark.sql.Row] = Array([null])

 I created a JIRA for it:

   https://issues.apache.org/jira/browse/SPARK-4781


 Jianshi

 On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hmm... another issue I found doing this approach is that ANALYZE TABLE
 ... COMPUTE STATISTICS will fail to attach the metadata to the table, and
 later broadcast join and such will fail...

 Any idea how to fix this issue?

 Jianshi

 On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Very interesting, the line doing drop table will throws an exception.
 After removing it all works.

 Jianshi

 On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Here's the solution I got after talking with Liancheng:

 1) using backquote `..` to wrap up all illegal characters

 val rdd = parquetFile(file)
 val schema = rdd.schema.fields.map(f = s`${f.name}`
 ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n)

 val ddl_13 = s
   |CREATE EXTERNAL TABLE $name (
   |  $schema
   |)
   |STORED AS PARQUET
   |LOCATION '$file'
   .stripMargin

 sql(ddl_13)

 2) create a new Schema and do applySchema to generate a new SchemaRDD,
 had to drop and register table

 val t = table(name)
 val newSchema = StructType(t.schema.fields.map(s = s.copy(name =
 s.name.replaceAll(.*?::, 
 sql(sdrop table $name)
 applySchema(t, newSchema).registerTempTable(name)

 I'm testing it for now.

 Thanks for the help!


 Jianshi

 On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com
  wrote:

 Hi,

 I had to use Pig for some preprocessing and to generate Parquet files
 for Spark to consume.

 However, due to Pig's limitation, the generated schema contains Pig's
 identifier

 e.g.
 sorted::id, sorted::cre_ts, ...

 I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.

   create external table pmt (
 sorted::id bigint
   )
   stored as parquet
   location '...'

 Obviously it didn't work, I also tried removing the identifier
 sorted::, but the resulting rows contain only nulls.

 Any idea how to create a table in HiveContext from these Parquet
 files?

 Thanks,
 Jianshi
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-06 Thread Jianshi Huang
Very interesting, the line doing drop table will throws an exception. After
removing it all works.

Jianshi

On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Here's the solution I got after talking with Liancheng:

 1) using backquote `..` to wrap up all illegal characters

 val rdd = parquetFile(file)
 val schema = rdd.schema.fields.map(f = s`${f.name}`
 ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n)

 val ddl_13 = s
   |CREATE EXTERNAL TABLE $name (
   |  $schema
   |)
   |STORED AS PARQUET
   |LOCATION '$file'
   .stripMargin

 sql(ddl_13)

 2) create a new Schema and do applySchema to generate a new SchemaRDD, had
 to drop and register table

 val t = table(name)
 val newSchema = StructType(t.schema.fields.map(s = s.copy(name =
 s.name.replaceAll(.*?::, 
 sql(sdrop table $name)
 applySchema(t, newSchema).registerTempTable(name)

 I'm testing it for now.

 Thanks for the help!


 Jianshi

 On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I had to use Pig for some preprocessing and to generate Parquet files for
 Spark to consume.

 However, due to Pig's limitation, the generated schema contains Pig's
 identifier

 e.g.
 sorted::id, sorted::cre_ts, ...

 I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.

   create external table pmt (
 sorted::id bigint
   )
   stored as parquet
   location '...'

 Obviously it didn't work, I also tried removing the identifier sorted::,
 but the resulting rows contain only nulls.

 Any idea how to create a table in HiveContext from these Parquet files?

 Thanks,
 Jianshi
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-06 Thread Jianshi Huang
Hmm... another issue I found doing this approach is that ANALYZE TABLE ...
COMPUTE STATISTICS will fail to attach the metadata to the table, and later
broadcast join and such will fail...

Any idea how to fix this issue?

Jianshi

On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Very interesting, the line doing drop table will throws an exception.
 After removing it all works.

 Jianshi

 On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Here's the solution I got after talking with Liancheng:

 1) using backquote `..` to wrap up all illegal characters

 val rdd = parquetFile(file)
 val schema = rdd.schema.fields.map(f = s`${f.name}`
 ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n)

 val ddl_13 = s
   |CREATE EXTERNAL TABLE $name (
   |  $schema
   |)
   |STORED AS PARQUET
   |LOCATION '$file'
   .stripMargin

 sql(ddl_13)

 2) create a new Schema and do applySchema to generate a new SchemaRDD,
 had to drop and register table

 val t = table(name)
 val newSchema = StructType(t.schema.fields.map(s = s.copy(name =
 s.name.replaceAll(.*?::, 
 sql(sdrop table $name)
 applySchema(t, newSchema).registerTempTable(name)

 I'm testing it for now.

 Thanks for the help!


 Jianshi

 On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I had to use Pig for some preprocessing and to generate Parquet files
 for Spark to consume.

 However, due to Pig's limitation, the generated schema contains Pig's
 identifier

 e.g.
 sorted::id, sorted::cre_ts, ...

 I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.

   create external table pmt (
 sorted::id bigint
   )
   stored as parquet
   location '...'

 Obviously it didn't work, I also tried removing the identifier sorted::,
 but the resulting rows contain only nulls.

 Any idea how to create a table in HiveContext from these Parquet files?

 Thanks,
 Jianshi
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-06 Thread Jianshi Huang
Ok, found another possible bug in Hive.

My current solution is to use ALTER TABLE CHANGE to rename the column names.

The problem is after renaming the column names, the value of the columns
became all NULL.

Before renaming:
scala sql(select `sorted::cre_ts` from pmt limit 1).collect
res12: Array[org.apache.spark.sql.Row] = Array([12/02/2014 07:38:54])

Execute renaming:
scala sql(alter table pmt change `sorted::cre_ts` cre_ts string)
res13: org.apache.spark.sql.SchemaRDD =
SchemaRDD[972] at RDD at SchemaRDD.scala:108
== Query Plan ==
Native command: executed by Hive

After renaming:
scala sql(select cre_ts from pmt limit 1).collect
res16: Array[org.apache.spark.sql.Row] = Array([null])

I created a JIRA for it:

  https://issues.apache.org/jira/browse/SPARK-4781


Jianshi

On Sun, Dec 7, 2014 at 1:06 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hmm... another issue I found doing this approach is that ANALYZE TABLE ...
 COMPUTE STATISTICS will fail to attach the metadata to the table, and later
 broadcast join and such will fail...

 Any idea how to fix this issue?

 Jianshi

 On Sat, Dec 6, 2014 at 9:10 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Very interesting, the line doing drop table will throws an exception.
 After removing it all works.

 Jianshi

 On Sat, Dec 6, 2014 at 9:11 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Here's the solution I got after talking with Liancheng:

 1) using backquote `..` to wrap up all illegal characters

 val rdd = parquetFile(file)
 val schema = rdd.schema.fields.map(f = s`${f.name}`
 ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n)

 val ddl_13 = s
   |CREATE EXTERNAL TABLE $name (
   |  $schema
   |)
   |STORED AS PARQUET
   |LOCATION '$file'
   .stripMargin

 sql(ddl_13)

 2) create a new Schema and do applySchema to generate a new SchemaRDD,
 had to drop and register table

 val t = table(name)
 val newSchema = StructType(t.schema.fields.map(s = s.copy(name =
 s.name.replaceAll(.*?::, 
 sql(sdrop table $name)
 applySchema(t, newSchema).registerTempTable(name)

 I'm testing it for now.

 Thanks for the help!


 Jianshi

 On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I had to use Pig for some preprocessing and to generate Parquet files
 for Spark to consume.

 However, due to Pig's limitation, the generated schema contains Pig's
 identifier

 e.g.
 sorted::id, sorted::cre_ts, ...

 I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.

   create external table pmt (
 sorted::id bigint
   )
   stored as parquet
   location '...'

 Obviously it didn't work, I also tried removing the identifier
 sorted::, but the resulting rows contain only nulls.

 Any idea how to create a table in HiveContext from these Parquet files?

 Thanks,
 Jianshi
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Convert RDD[Map[String, Any]] to SchemaRDD

2014-12-06 Thread Jianshi Huang
Hmm..

I've created a JIRA: https://issues.apache.org/jira/browse/SPARK-4782

Jianshi

On Sun, Dec 7, 2014 at 2:32 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 What's the best way to convert RDD[Map[String, Any]] to a SchemaRDD?

 I'm currently converting each Map to a JSON String and do
 JsonRDD.inferSchema.

 How about adding inferSchema support to Map[String, Any] directly? It
 would be very useful.

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: drop table if exists throws exception

2014-12-05 Thread Jianshi Huang
I see. The resulting SchemaRDD is returned so like Michael said, the
exception does not propogate to user code.

However printing out the following log is confusing :)

scala sql(drop table if exists abc)
14/12/05 16:27:02 INFO ParseDriver: Parsing command: drop table if exists
abc
14/12/05 16:27:02 INFO ParseDriver: Parse Completed
14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=Driver.run
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=TimeToSubmit
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO Driver: Concurrency mode is disabled, not creating a
lock manager
14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=compile
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=parse
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO ParseDriver: Parsing command: DROP TABLE IF EXISTS
abc
14/12/05 16:27:02 INFO ParseDriver: Parse Completed
14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=parse
start=1417825622650 end=1417825622650 duration=0
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=semanticAnalyze
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc
14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr
 cmd=get_table : db=default tbl=abc
14/12/05 16:27:02 INFO Driver: Semantic Analysis Completed
14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=semanticAnalyze
start=1417825622650 end=1417825622653 duration=3
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO Driver: Returning Hive schema:
Schema(fieldSchemas:null, properties:null)
14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=compile
start=1417825622650 end=1417825622654 duration=4
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=Driver.execute
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO Driver: Starting command: DROP TABLE IF EXISTS abc
14/12/05 16:27:02 INFO PerfLogger: /PERFLOG method=TimeToSubmit
start=1417825622650 end=1417825622654 duration=4
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=runTasks
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO PerfLogger: PERFLOG method=task.DDL.Stage-0
from=org.apache.hadoop.hive.ql.Driver
14/12/05 16:27:02 INFO HiveMetaStore: 0: get_table : db=default tbl=abc
14/12/05 16:27:02 INFO audit: ugi=jianshuangip=unknown-ip-addr
 cmd=get_table : db=default tbl=abc
14/12/05 16:27:02 ERROR Hive: NoSuchObjectException(message:default.abc
table not found)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1560)
at sun.reflect.GeneratedMethodAccessor57.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)

On Sat, Dec 6, 2014 at 3:45 AM, Mark Hamstra m...@clearstorydata.com
wrote:

 And that is no different from how Hive has worked for a long time.

 On Fri, Dec 5, 2014 at 11:42 AM, Michael Armbrust mich...@databricks.com
 wrote:

 The command run fine for me on master.  Note that Hive does print an
 exception in the logs, but that exception does not propogate to user code.

 On Thu, Dec 4, 2014 at 11:31 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  Hi,
 
  I got exception saying Hive: NoSuchObjectException(message:table table
  not found)
 
  when running DROP TABLE IF EXISTS table
 
  Looks like a new regression in Hive module.
 
  Anyone can confirm this?
 
  Thanks,
  --
  Jianshi Huang
 
  LinkedIn: jianshi
  Twitter: @jshuang
  Github  Blog: http://huangjs.github.com/
 





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-05 Thread Jianshi Huang
Hi,

I had to use Pig for some preprocessing and to generate Parquet files for
Spark to consume.

However, due to Pig's limitation, the generated schema contains Pig's
identifier

e.g.
sorted::id, sorted::cre_ts, ...

I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.

  create external table pmt (
sorted::id bigint
  )
  stored as parquet
  location '...'

Obviously it didn't work, I also tried removing the identifier sorted::,
but the resulting rows contain only nulls.

Any idea how to create a table in HiveContext from these Parquet files?

Thanks,
Jianshi
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Hive Problem in Pig generated Parquet file schema in CREATE EXTERNAL TABLE (e.g. bag::col1)

2014-12-05 Thread Jianshi Huang
Here's the solution I got after talking with Liancheng:

1) using backquote `..` to wrap up all illegal characters

val rdd = parquetFile(file)
val schema = rdd.schema.fields.map(f = s`${f.name}`
${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n)

val ddl_13 = s
  |CREATE EXTERNAL TABLE $name (
  |  $schema
  |)
  |STORED AS PARQUET
  |LOCATION '$file'
  .stripMargin

sql(ddl_13)

2) create a new Schema and do applySchema to generate a new SchemaRDD, had
to drop and register table

val t = table(name)
val newSchema = StructType(t.schema.fields.map(s = s.copy(name =
s.name.replaceAll(.*?::, 
sql(sdrop table $name)
applySchema(t, newSchema).registerTempTable(name)

I'm testing it for now.

Thanks for the help!


Jianshi

On Sat, Dec 6, 2014 at 8:41 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I had to use Pig for some preprocessing and to generate Parquet files for
 Spark to consume.

 However, due to Pig's limitation, the generated schema contains Pig's
 identifier

 e.g.
 sorted::id, sorted::cre_ts, ...

 I tried to put the schema inside CREATE EXTERNAL TABLE, e.g.

   create external table pmt (
 sorted::id bigint
   )
   stored as parquet
   location '...'

 Obviously it didn't work, I also tried removing the identifier sorted::,
 but the resulting rows contain only nulls.

 Any idea how to create a table in HiveContext from these Parquet files?

 Thanks,
 Jianshi
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
I got the following error during Spark startup (Yarn-client mode):

14/12/04 19:33:58 INFO Client: Uploading resource
file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
-
hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
java.lang.IllegalArgumentException: Wrong FS:
hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
at
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
at
org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
at
org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
at
org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
at
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
at
org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
at
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
at
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
at org.apache.spark.SparkContext.init(SparkContext.scala:335)
at
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
at $iwC$$iwC.init(console:9)
at $iwC.init(console:18)
at init(console:20)
at .init(console:24)

I'm using latest Spark built from master HEAD yesterday. Is this a bug?

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
Looks like somehow Spark failed to find the core-site.xml in /et/hadoop/conf

I've already set the following env variables:

export YARN_CONF_DIR=/etc/hadoop/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HBASE_CONF_DIR=/etc/hbase/conf

Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?

Jianshi

On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I got the following error during Spark startup (Yarn-client mode):

 14/12/04 19:33:58 INFO Client: Uploading resource
 file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
 -
 hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
 java.lang.IllegalArgumentException: Wrong FS:
 hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
 expected: file:///
 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
 at
 org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
 at
 org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
 at
 org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
 at
 org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
 at
 org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
 at
 org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
 at org.apache.spark.SparkContext.init(SparkContext.scala:335)
 at
 org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
 at $iwC$$iwC.init(console:9)
 at $iwC.init(console:18)
 at init(console:20)
 at .init(console:24)

 I'm using latest Spark built from master HEAD yesterday. Is this a bug?

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
Actually my HADOOP_CLASSPATH has already been set to include
/etc/hadoop/conf/*

export
HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase
classpath)

Jianshi

On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Looks like somehow Spark failed to find the core-site.xml in
 /et/hadoop/conf

 I've already set the following env variables:

 export YARN_CONF_DIR=/etc/hadoop/conf
 export HADOOP_CONF_DIR=/etc/hadoop/conf
 export HBASE_CONF_DIR=/etc/hbase/conf

 Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?

 Jianshi

 On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 I got the following error during Spark startup (Yarn-client mode):

 14/12/04 19:33:58 INFO Client: Uploading resource
 file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
 -
 hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
 java.lang.IllegalArgumentException: Wrong FS:
 hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
 expected: file:///
 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
 at
 org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
 at
 org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
 at
 org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
 at
 org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
 at
 org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
 at
 org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
 at org.apache.spark.SparkContext.init(SparkContext.scala:335)
 at
 org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
 at $iwC$$iwC.init(console:9)
 at $iwC.init(console:18)
 at init(console:20)
 at .init(console:24)

 I'm using latest Spark built from master HEAD yesterday. Is this a bug?

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
Looks like the datanucleus*.jar shouldn't appear in the hdfs path in
Yarn-client mode.

Maybe this patch broke yarn-client.

https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53

Jianshi

On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Actually my HADOOP_CLASSPATH has already been set to include
 /etc/hadoop/conf/*

 export
 HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase
 classpath)

 Jianshi

 On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Looks like somehow Spark failed to find the core-site.xml in
 /et/hadoop/conf

 I've already set the following env variables:

 export YARN_CONF_DIR=/etc/hadoop/conf
 export HADOOP_CONF_DIR=/etc/hadoop/conf
 export HBASE_CONF_DIR=/etc/hbase/conf

 Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?

 Jianshi

 On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 I got the following error during Spark startup (Yarn-client mode):

 14/12/04 19:33:58 INFO Client: Uploading resource
 file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
 -
 hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
 java.lang.IllegalArgumentException: Wrong FS:
 hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
 expected: file:///
 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
 at
 org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
 at
 org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
 at
 org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
 at
 org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
 at
 org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
 at
 org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
 at org.apache.spark.SparkContext.init(SparkContext.scala:335)
 at
 org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
 at $iwC$$iwC.init(console:9)
 at $iwC.init(console:18)
 at init(console:20)
 at .init(console:24)

 I'm using latest Spark built from master HEAD yesterday. Is this a bug?

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Exception adding resource files in latest Spark

2014-12-04 Thread Jianshi Huang
Correction:

According to Liancheng, this hotfix might be the root cause:


https://github.com/apache/spark/commit/38cb2c3a36a5c9ead4494cbc3dde008c2f0698ce

Jianshi

On Fri, Dec 5, 2014 at 12:45 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Looks like the datanucleus*.jar shouldn't appear in the hdfs path in
 Yarn-client mode.

 Maybe this patch broke yarn-client.


 https://github.com/apache/spark/commit/a975dc32799bb8a14f9e1c76defaaa7cfbaf8b53

 Jianshi

 On Fri, Dec 5, 2014 at 12:02 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Actually my HADOOP_CLASSPATH has already been set to include
 /etc/hadoop/conf/*

 export
 HADOOP_CLASSPATH=/etc/hbase/conf/hbase-site.xml:/usr/lib/hbase/lib/hbase-protocol.jar:$(hbase
 classpath)

 Jianshi

 On Fri, Dec 5, 2014 at 11:54 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Looks like somehow Spark failed to find the core-site.xml in
 /et/hadoop/conf

 I've already set the following env variables:

 export YARN_CONF_DIR=/etc/hadoop/conf
 export HADOOP_CONF_DIR=/etc/hadoop/conf
 export HBASE_CONF_DIR=/etc/hbase/conf

 Should I put $HADOOP_CONF_DIR/* to HADOOP_CLASSPATH?

 Jianshi

 On Fri, Dec 5, 2014 at 11:37 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 I got the following error during Spark startup (Yarn-client mode):

 14/12/04 19:33:58 INFO Client: Uploading resource
 file:/x/home/jianshuang/spark/spark-latest/lib/datanucleus-api-jdo-3.2.6.jar
 -
 hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar
 java.lang.IllegalArgumentException: Wrong FS:
 hdfs://stampy/user/jianshuang/.sparkStaging/application_1404410683830_531767/datanucleus-api-jdo-3.2.6.jar,
 expected: file:///
 at
 org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:643)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:79)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:506)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
 at
 org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
 at
 org.apache.spark.deploy.yarn.ClientDistributedCacheManager.addResource(ClientDistributedCacheManager.scala:67)
 at
 org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:257)
 at
 org.apache.spark.deploy.yarn.ClientBase$$anonfun$prepareLocalResources$5.apply(ClientBase.scala:242)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.deploy.yarn.ClientBase$class.prepareLocalResources(ClientBase.scala:242)
 at
 org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.ClientBase$class.createContainerLaunchContext(ClientBase.scala:350)
 at
 org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:35)
 at
 org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:80)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
 at org.apache.spark.SparkContext.init(SparkContext.scala:335)
 at
 org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
 at $iwC$$iwC.init(console:9)
 at $iwC.init(console:18)
 at init(console:20)
 at .init(console:24)

 I'm using latest Spark built from master HEAD yesterday. Is this a bug?

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Auto BroadcastJoin optimization failed in latest Spark

2014-12-04 Thread Jianshi Huang
Sorry for the late of follow-up.

I used Hao's DESC EXTENDED command and found some clue:

new (broadcast broken Spark build):
parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892,
COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1}

old (broadcast working Spark build):
parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591,
totalSize=56166}

Looks like the table size computation failed in the latest version.

I've run the analyze command:

  ANALYZE TABLE $table COMPUTE STATISTICS noscan

And the tables are created from Parquet files:

e.g.
CREATE EXTERNAL TABLE table1 (
  code int,
  desc string
)
STORED AS PARQUET
LOCATION '/user/jianshuang/data/dim_tables/table1.parquet'


Anyone knows what went wrong?


Thanks,
Jianshi



On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Hi Jianshi,

 I couldn’t reproduce that with latest MASTER, and I can always get the
 BroadcastHashJoin for managed tables (in .csv file) in my testing, are
 there any external tables in your case?



 In general probably couple of things you can try first (with HiveContext):

 1)  ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all
 of the tables);

 2)  SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold
 as a greater value, it is 1024*1024*10 by default, just make sure the
 maximum dimension tables size (in bytes) is less than this)

 3)  Always put the main table(the biggest table) in the left-most
 among the inner joins;



 DESC EXTENDED tablename; -- this will print the detail information for the
 statistic table size (the field “totalSize”)

 EXPLAIN EXTENDED query; -- this will print the detail physical plan.



 Let me know if you still have problem.



 Hao



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, November 27, 2014 10:24 PM
 *To:* Cheng, Hao
 *Cc:* user
 *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark



 Hi Hao,



 I'm using inner join as Broadcast join didn't work for left joins (thanks
 for the links for the latest improvements).



 And I'm using HiveConext and it worked in a previous build (10/12) when
 joining 15 dimension tables.



 Jianshi



 On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Are all of your join keys the same? and I guess the join type are all
 “Left” join, https://github.com/apache/spark/pull/3362 probably is what
 you need.



 And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast
 join) currently,  https://github.com/apache/spark/pull/3270 should be
 another optimization for this.





 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Wednesday, November 26, 2014 4:36 PM
 *To:* user
 *Subject:* Auto BroadcastJoin optimization failed in latest Spark



 Hi,



 I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails
 optimizing auto broadcast join in my query. I have a query that joins a
 huge fact table with 15 tiny dimension tables.



 I'm currently using an older version of Spark which was built on Oct. 12.



 Anyone else has met similar situation?



 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Auto BroadcastJoin optimization failed in latest Spark

2014-12-04 Thread Jianshi Huang
If I run ANALYZE without NOSCAN, then Hive can successfully get the size:

parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589,
COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296}

Is Hive's PARQUET support broken?

Jianshi


On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Sorry for the late of follow-up.

 I used Hao's DESC EXTENDED command and found some clue:

 new (broadcast broken Spark build):
 parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892,
 COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1}

 old (broadcast working Spark build):
 parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591,
 totalSize=56166}

 Looks like the table size computation failed in the latest version.

 I've run the analyze command:

   ANALYZE TABLE $table COMPUTE STATISTICS noscan

 And the tables are created from Parquet files:

 e.g.
 CREATE EXTERNAL TABLE table1 (
   code int,
   desc string
 )
 STORED AS PARQUET
 LOCATION '/user/jianshuang/data/dim_tables/table1.parquet'


 Anyone knows what went wrong?


 Thanks,
 Jianshi



 On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Hi Jianshi,

 I couldn’t reproduce that with latest MASTER, and I can always get the
 BroadcastHashJoin for managed tables (in .csv file) in my testing, are
 there any external tables in your case?



 In general probably couple of things you can try first (with HiveContext):

 1)  ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all
 of the tables);

 2)  SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the threshold
 as a greater value, it is 1024*1024*10 by default, just make sure the
 maximum dimension tables size (in bytes) is less than this)

 3)  Always put the main table(the biggest table) in the left-most
 among the inner joins;



 DESC EXTENDED tablename; -- this will print the detail information for
 the statistic table size (the field “totalSize”)

 EXPLAIN EXTENDED query; -- this will print the detail physical plan.



 Let me know if you still have problem.



 Hao



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, November 27, 2014 10:24 PM
 *To:* Cheng, Hao
 *Cc:* user
 *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark



 Hi Hao,



 I'm using inner join as Broadcast join didn't work for left joins (thanks
 for the links for the latest improvements).



 And I'm using HiveConext and it worked in a previous build (10/12) when
 joining 15 dimension tables.



 Jianshi



 On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Are all of your join keys the same? and I guess the join type are all
 “Left” join, https://github.com/apache/spark/pull/3362 probably is what
 you need.



 And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast
 join) currently,  https://github.com/apache/spark/pull/3270 should be
 another optimization for this.





 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Wednesday, November 26, 2014 4:36 PM
 *To:* user
 *Subject:* Auto BroadcastJoin optimization failed in latest Spark



 Hi,



 I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1
 fails optimizing auto broadcast join in my query. I have a query that joins
 a huge fact table with 15 tiny dimension tables.



 I'm currently using an older version of Spark which was built on Oct. 12.



 Anyone else has met similar situation?



 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Auto BroadcastJoin optimization failed in latest Spark

2014-12-04 Thread Jianshi Huang
With Liancheng's suggestion, I've tried setting

 spark.sql.hive.convertMetastoreParquet  false

but still analyze noscan return -1 in rawDataSize

Jianshi


On Fri, Dec 5, 2014 at 3:33 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 If I run ANALYZE without NOSCAN, then Hive can successfully get the size:

 parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417764589,
 COLUMN_STATS_ACCURATE=true, totalSize=0, numRows=1156, rawDataSize=76296}

 Is Hive's PARQUET support broken?

 Jianshi


 On Fri, Dec 5, 2014 at 3:30 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Sorry for the late of follow-up.

 I used Hao's DESC EXTENDED command and found some clue:

 new (broadcast broken Spark build):
 parameters:{numFiles=0, EXTERNAL=TRUE, transient_lastDdlTime=1417763892,
 COLUMN_STATS_ACCURATE=false, totalSize=0, numRows=-1, rawDataSize=-1}

 old (broadcast working Spark build):
 parameters:{EXTERNAL=TRUE, transient_lastDdlTime=1417763591,
 totalSize=56166}

 Looks like the table size computation failed in the latest version.

 I've run the analyze command:

   ANALYZE TABLE $table COMPUTE STATISTICS noscan

 And the tables are created from Parquet files:

 e.g.
 CREATE EXTERNAL TABLE table1 (
   code int,
   desc string
 )
 STORED AS PARQUET
 LOCATION '/user/jianshuang/data/dim_tables/table1.parquet'


 Anyone knows what went wrong?


 Thanks,
 Jianshi



 On Fri, Nov 28, 2014 at 1:24 PM, Cheng, Hao hao.ch...@intel.com wrote:

  Hi Jianshi,

 I couldn’t reproduce that with latest MASTER, and I can always get the
 BroadcastHashJoin for managed tables (in .csv file) in my testing, are
 there any external tables in your case?



 In general probably couple of things you can try first (with
 HiveContext):

 1)  ANALYZE TABLE xxx COMPUTE STATISTICS NOSCAN; (apply that to all
 of the tables);

 2)  SET spark.sql.autoBroadcastJoinThreshold=xxx; (Set the
 threshold as a greater value, it is 1024*1024*10 by default, just make sure
 the maximum dimension tables size (in bytes) is less than this)

 3)  Always put the main table(the biggest table) in the left-most
 among the inner joins;



 DESC EXTENDED tablename; -- this will print the detail information for
 the statistic table size (the field “totalSize”)

 EXPLAIN EXTENDED query; -- this will print the detail physical plan.



 Let me know if you still have problem.



 Hao



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, November 27, 2014 10:24 PM
 *To:* Cheng, Hao
 *Cc:* user
 *Subject:* Re: Auto BroadcastJoin optimization failed in latest Spark



 Hi Hao,



 I'm using inner join as Broadcast join didn't work for left joins
 (thanks for the links for the latest improvements).



 And I'm using HiveConext and it worked in a previous build (10/12) when
 joining 15 dimension tables.



 Jianshi



 On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Are all of your join keys the same? and I guess the join type are all
 “Left” join, https://github.com/apache/spark/pull/3362 probably is what
 you need.



 And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast
 join) currently,  https://github.com/apache/spark/pull/3270 should be
 another optimization for this.





 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Wednesday, November 26, 2014 4:36 PM
 *To:* user
 *Subject:* Auto BroadcastJoin optimization failed in latest Spark



 Hi,



 I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1
 fails optimizing auto broadcast join in my query. I have a query that joins
 a huge fact table with 15 tiny dimension tables.



 I'm currently using an older version of Spark which was built on Oct. 12.



 Anyone else has met similar situation?



 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Auto BroadcastJoin optimization failed in latest Spark

2014-11-27 Thread Jianshi Huang
Hi Hao,

I'm using inner join as Broadcast join didn't work for left joins (thanks
for the links for the latest improvements).

And I'm using HiveConext and it worked in a previous build (10/12) when
joining 15 dimension tables.

Jianshi

On Thu, Nov 27, 2014 at 8:35 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Are all of your join keys the same? and I guess the join type are all
 “Left” join, https://github.com/apache/spark/pull/3362 probably is what
 you need.



 And, SparkSQL doesn’t support the multiway-join (and multiway-broadcast
 join) currently,  https://github.com/apache/spark/pull/3270 should be
 another optimization for this.





 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Wednesday, November 26, 2014 4:36 PM
 *To:* user
 *Subject:* Auto BroadcastJoin optimization failed in latest Spark



 Hi,



 I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails
 optimizing auto broadcast join in my query. I have a query that joins a
 huge fact table with 15 tiny dimension tables.



 I'm currently using an older version of Spark which was built on Oct. 12.



 Anyone else has met similar situation?



 --

 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Auto BroadcastJoin optimization failed in latest Spark

2014-11-26 Thread Jianshi Huang
Hi,

I've confirmed that the latest Spark with either Hive 0.12 or 0.13.1 fails
optimizing auto broadcast join in my query. I have a query that joins a
huge fact table with 15 tiny dimension tables.

I'm currently using an older version of Spark which was built on Oct. 12.

Anyone else has met similar situation?

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: How to do broadcast join in SparkSQL

2014-11-25 Thread Jianshi Huang
Hi,

Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet support.
I got the following exceptions:

org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
implement HiveOutputFormat, otherwise it should be either
IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
at
org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)

Using the same DDL and Analyze script above.

Jianshi


On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 It works fine, thanks for the help Michael.

 Liancheng also told me a trick, using a subquery with LIMIT n. It works in
 latest 1.2.0

 BTW, looks like the broadcast optimization won't be recognized if I do a
 left join instead of a inner join. Is that true? How can I make it work for
 left joins?

 Cheers,
 Jianshi

 On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust mich...@databricks.com
 wrote:

 Thanks for the input.  We purposefully made sure that the config option
 did not make it into a release as it is not something that we are willing
 to support long term.  That said we'll try and make this easier in the
 future either through hints or better support for statistics.

 In this particular case you can get what you want by registering the
 tables as external tables and setting an flag.  Here's a helper function to
 do what you need.

 /**
  * Sugar for creating a Hive external table from a parquet path.
  */
 def createParquetTable(name: String, file: String): Unit = {
   import org.apache.spark.sql.hive.HiveMetastoreTypes

   val rdd = parquetFile(file)
   val schema = rdd.schema.fields.map(f = s${f.name}
 ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n)
   val ddl = s
 |CREATE EXTERNAL TABLE $name (
 |  $schema
 |)
 |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
 |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
 |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
 |LOCATION '$file'.stripMargin
   sql(ddl)
   setConf(spark.sql.hive.convertMetastoreParquet, true)
 }

 You'll also need to run this to populate the statistics:

 ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;


 On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Ok, currently there's cost-based optimization however Parquet statistics
 is not implemented...

 What's the good way if I want to join a big fact table with several tiny
 dimension tables in Spark SQL (1.1)?

 I wish we can allow user hint for the join.

 Jianshi

 On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
 merged into master?

 I cannot find spark.sql.hints.broadcastTables in latest master, but
 it's in the following patch.


 https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5


 Jianshi


 On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang jianshi.hu...@gmail.com
  wrote:

 Yes, looks like it can only be controlled by the
 parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit 
 weird
 to me.

 How am I suppose to know the exact bytes of a table? Let me specify
 the join algorithm is preferred I think.

 Jianshi

 On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you looked at SPARK-1800 ?

 e.g. see sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 Cheers

 On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 I cannot find it in the documentation. And I have a dozen dimension
 tables to (left) join...


 Cheers,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: How to do broadcast join in SparkSQL

2014-11-25 Thread Jianshi Huang
Oh, I found a explanation from
http://cmenguy.github.io/blog/2013/10/30/using-hive-with-parquet-format-in-cdh-4-dot-3/

The error here is a bit misleading, what it really means is that the class
parquet.hive.DeprecatedParquetOutputFormat isn’t in the classpath for Hive.
Sure enough, doing a ls /usr/lib/hive/lib doesn’t show any of the parquet
jars, but ls /usr/lib/impala/lib shows the jar we’re looking for as
parquet-hive-1.0.jar
Is it removed from latest Spark?

Jianshi


On Wed, Nov 26, 2014 at 2:13 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 Looks like the latest SparkSQL with Hive 0.12 has a bug in Parquet
 support. I got the following exceptions:

 org.apache.hadoop.hive.ql.parse.SemanticException: Output Format must
 implement HiveOutputFormat, otherwise it should be either
 IgnoreKeyTextOutputFormat or SequenceFileOutputFormat
 at
 org.apache.hadoop.hive.ql.plan.CreateTableDesc.validate(CreateTableDesc.java:431)
 at
 org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9964)
 at
 org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:9180)
 at
 org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:327)

 Using the same DDL and Analyze script above.

 Jianshi


 On Sat, Oct 11, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 It works fine, thanks for the help Michael.

 Liancheng also told me a trick, using a subquery with LIMIT n. It works
 in latest 1.2.0

 BTW, looks like the broadcast optimization won't be recognized if I do a
 left join instead of a inner join. Is that true? How can I make it work for
 left joins?

 Cheers,
 Jianshi

 On Thu, Oct 9, 2014 at 3:10 AM, Michael Armbrust mich...@databricks.com
 wrote:

 Thanks for the input.  We purposefully made sure that the config option
 did not make it into a release as it is not something that we are willing
 to support long term.  That said we'll try and make this easier in the
 future either through hints or better support for statistics.

 In this particular case you can get what you want by registering the
 tables as external tables and setting an flag.  Here's a helper function to
 do what you need.

 /**
  * Sugar for creating a Hive external table from a parquet path.
  */
 def createParquetTable(name: String, file: String): Unit = {
   import org.apache.spark.sql.hive.HiveMetastoreTypes

   val rdd = parquetFile(file)
   val schema = rdd.schema.fields.map(f = s${f.name}
 ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n)
   val ddl = s
 |CREATE EXTERNAL TABLE $name (
 |  $schema
 |)
 |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
 |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
 |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
 |LOCATION '$file'.stripMargin
   sql(ddl)
   setConf(spark.sql.hive.convertMetastoreParquet, true)
 }

 You'll also need to run this to populate the statistics:

 ANALYZE TABLE  tableName COMPUTE STATISTICS noscan;


 On Wed, Oct 8, 2014 at 1:44 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Ok, currently there's cost-based optimization however Parquet
 statistics is not implemented...

 What's the good way if I want to join a big fact table with several
 tiny dimension tables in Spark SQL (1.1)?

 I wish we can allow user hint for the join.

 Jianshi

 On Wed, Oct 8, 2014 at 2:18 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Looks like https://issues.apache.org/jira/browse/SPARK-1800 is not
 merged into master?

 I cannot find spark.sql.hints.broadcastTables in latest master, but
 it's in the following patch.


 https://github.com/apache/spark/commit/76ca4341036b95f71763f631049fdae033990ab5


 Jianshi


 On Mon, Sep 29, 2014 at 1:24 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 Yes, looks like it can only be controlled by the
 parameter spark.sql.autoBroadcastJoinThreshold, which is a little bit 
 weird
 to me.

 How am I suppose to know the exact bytes of a table? Let me specify
 the join algorithm is preferred I think.

 Jianshi

 On Sun, Sep 28, 2014 at 11:57 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you looked at SPARK-1800 ?

 e.g. see
 sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 Cheers

 On Sun, Sep 28, 2014 at 1:55 AM, Jianshi Huang 
 jianshi.hu...@gmail.com wrote:

 I cannot find it in the documentation. And I have a dozen dimension
 tables to (left) join...


 Cheers,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter

How to deal with BigInt in my case class for RDD = SchemaRDD convertion

2014-11-21 Thread Jianshi Huang
Hi,

I got an error during rdd.registerTempTable(...) saying scala.MatchError:
scala.BigInt

Looks like BigInt cannot be used in SchemaRDD, is that correct?

So what would you recommend to deal with it?

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: How to deal with BigInt in my case class for RDD = SchemaRDD convertion

2014-11-21 Thread Jianshi Huang
Ah yes. I found it too in the manual. Thanks for the help anyway!

Since BigDecimal is just a wrapper around BigInt, let's also convert to
BigInt to Decimal.

I created a ticket. https://issues.apache.org/jira/browse/SPARK-4549

Jianshi

On Fri, Nov 21, 2014 at 11:30 PM, Yin Huai huaiyin@gmail.com wrote:

 Hello Jianshi,

 The reason of that error is that we do not have a Spark SQL data type for
 Scala BigInt. You can use Decimal for your case.

 Thanks,

 Yin

 On Fri, Nov 21, 2014 at 5:11 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I got an error during rdd.registerTempTable(...) saying scala.MatchError:
 scala.BigInt

 Looks like BigInt cannot be used in SchemaRDD, is that correct?

 So what would you recommend to deal with it?

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Is it safe to use Scala 2.11 for Spark build?

2014-11-18 Thread Jianshi Huang
$1.apply(Ivy.scala:147)
 at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:124)
 at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56)
 at sbt.IvySbt$$anon$3.call(Ivy.scala:64)
 at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
 at
 xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
 at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
 at xsbt.boot.Using$.withResource(Using.scala:10)
 at xsbt.boot.Using$.apply(Using.scala:9)
 at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
 at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
 commit c6e0c2ab1c29c184a9302d23ad75e4ccd8060242
 at xsbt.boot.Locks$.apply0(Locks.scala:31)
 at xsbt.boot.Locks$.apply(Locks.scala:28)
 at sbt.IvySbt.withDefaultLogger(Ivy.scala:64)
 at sbt.IvySbt.withIvy(Ivy.scala:119)
 at sbt.IvySbt.withIvy(Ivy.scala:116)
 at sbt.IvySbt$Module.withModule(Ivy.scala:147)
 at sbt.IvyActions$.updateEither(IvyActions.scala:156)
 at
 sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1282)
 at
 sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1279)
 at
 sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$84.apply(Defaults.scala:1309)
 at
 sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$84.apply(Defaults.scala:1307)
 at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35)
 at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1312)
 at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1306)
 at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45)
 at sbt.Classpaths$.cachedUpdate(Defaults.scala:1324)
 at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1264)
 at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1242)
 at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
 at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
 at sbt.std.Transform$$anon$4.work(System.scala:63)
 at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
 at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
 at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
 at sbt.Execute.work(Execute.scala:235)
 at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
 at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
 at
 sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
 at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 [error] (streaming-kafka/*:update) sbt.ResolveException: unresolved
 dependency: org.apache.kafka#kafka_2.11;0.8.0: not found
 [error] (catalyst/*:update) sbt.ResolveException: unresolved dependency:
 org.scalamacros#quasiquotes_2.11;2.0.1: not found

 --
 Ye Xianjin
 Sent with Sparrow http://www.sparrowmailapp.com/?sig

 On Tuesday, November 18, 2014 at 3:27 PM, Prashant Sharma wrote:

 It is safe in the sense we would help you with the fix if you run into
 issues. I have used it, but since I worked on the patch the opinion can be
 biased. I am using scala 2.11 for day to day development. You should
 checkout the build instructions here :
 https://github.com/ScrapCodes/spark-1/blob/patch-3/docs/building-spark.md

 Prashant Sharma



 On Tue, Nov 18, 2014 at 12:19 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Any notable issues for using Scala 2.11? Is it stable now?

 Or can I use Scala 2.11 in my spark application and use Spark dist build
 with 2.10 ?

 I'm looking forward to migrate to 2.11 for some quasiquote features.
 Couldn't make it run in 2.10...

 Cheers,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/






-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Is there setup and cleanup function in spark?

2014-11-17 Thread Jianshi Huang
I see. Agree that lazy eval is not suitable for proper setup and teardown.

We also abandoned it due to inherent incompatibility between implicit and
lazy. It was fun to come up this trick though.

Jianshi

On Tue, Nov 18, 2014 at 10:28 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Fri, Nov 14, 2014 at 2:49 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Ok, then we need another trick.

 let's have an *implicit lazy var connection/context* around our code.
 And setup() will trigger the eval and initialization.


 Due to lazy evaluation, I think having setup/teardown is a bit tricky. In
 particular teardown, because it is not easy to execute code after all
 computation is done. You can check
 http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p11009.html
 for an example of what worked for me.

 Tobias




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Is it safe to use Scala 2.11 for Spark build?

2014-11-17 Thread Jianshi Huang
Any notable issues for using Scala 2.11? Is it stable now?

Or can I use Scala 2.11 in my spark application and use Spark dist build
with 2.10 ?

I'm looking forward to migrate to 2.11 for some quasiquote features.
Couldn't make it run in 2.10...

Cheers,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Compiling Spark master HEAD failed.

2014-11-14 Thread Jianshi Huang
The mvn build command is

  mvn clean install -Pyarn -Phive -Phive-0.13.1 -Phadoop-2.4
-Djava.version=1.7 -DskipTests

I'm getting this error message:

  [ERROR] Failed to execute goal
net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first)
on project spark-hive_2.10: wrap: scala.r
eflect.internal.MissingRequirementError: object scala.runtime in compiler
mirror not found. - [Help 1]


Anyone knows what's the problem?

I'm building it on OSX. I didn't had this problem one month ago.


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Is there setup and cleanup function in spark?

2014-11-13 Thread Jianshi Huang
So can I write it like this?

rdd.mapPartition(i = setup(); i).map(...).mapPartition(i = cleanup(); i)

So I don't need to mess up the logic and still can use map, filter and
other transformations for RDD.

Jianshi

On Fri, Nov 14, 2014 at 12:20 PM, Cheng Lian lian.cs@gmail.com wrote:

  If you’re looking for executor side setup and cleanup functions, there
 ain’t any yet, but you can achieve the same semantics via
 RDD.mapPartitions.

 Please check the “setup() and cleanup” section of this blog from Cloudera
 for details:
 http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

 On 11/14/14 10:44 AM, Dai, Kevin wrote:

   HI, all



 Is there setup and cleanup function as in hadoop mapreduce in spark which
 does some initialization and cleanup work?



 Best Regards,

 Kevin.

   ​




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Is there setup and cleanup function in spark?

2014-11-13 Thread Jianshi Huang
Ok, then we need another trick.

let's have an *implicit lazy var connection/context* around our code. And
setup() will trigger the eval and initialization.

The implicit lazy val/var trick is actually invented by Kevin. :)

Jianshi

On Fri, Nov 14, 2014 at 1:41 PM, Cheng Lian lian.cs@gmail.com wrote:

  If you’re just relying on the side effect of setup() and cleanup() then
 I think this trick is OK and pretty cleaner.

 But if setup() returns, say, a DB connection, then the map(...) part and
 cleanup() can’t get the connection object.

 On 11/14/14 1:20 PM, Jianshi Huang wrote:

   So can I write it like this?

  rdd.mapPartition(i = setup(); i).map(...).mapPartition(i = cleanup();
 i)

  So I don't need to mess up the logic and still can use map, filter and
 other transformations for RDD.

  Jianshi

 On Fri, Nov 14, 2014 at 12:20 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  If you’re looking for executor side setup and cleanup functions, there
 ain’t any yet, but you can achieve the same semantics via
 RDD.mapPartitions.

 Please check the “setup() and cleanup” section of this blog from Cloudera
 for details:
 http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

 On 11/14/14 10:44 AM, Dai, Kevin wrote:

  HI, all



 Is there setup and cleanup function as in hadoop mapreduce in spark which
 does some initialization and cleanup work?



 Best Regards,

 Kevin.

  ​




  --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/

   ​




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: RDD to DStream

2014-11-12 Thread Jianshi Huang
I also discussed with Liancheng two weeks ago. And he suggested to use
toLocalIterator to collect partitions of RDD to driver (same order if RDD
is sorted), and then turn each partition to a RDD and put them in the queue.

So: To turn RDD[(timestamp, value)] to DStream

1) Group by timestamp/windowSize
2) Sort RDD by (group, timestamp)
3) Use toLocalIterator to collect each group/partition
4) Turn each group/partition to RDD and put them in a Queue
5) Use SparkStreamingContext.queueStream to consume the Queue[RDD] as
DStream

Looks good to me, will try it today. The downside is all data needs to be
collect to driver, is there a way to avoid doing this?

Thanks
Jianshi






On Mon, Oct 27, 2014 at 4:57 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Sure, let's still focus on the streaming simulation use case. It's a very
 useful problem to solve.

 If we're going to use the same Spark-streaming core for the simulation,
 the most simple way is to have a globally sorted RDDs and use
 ssc.queueStream. Thus collecting the Key part to driver is probably
 necessary.

 I've done offline simulation in Pig and it's absolutely non-trivial and
 error prone, also I had to have multiple copies of data due to overlaps of
 window.

 Therefore I would prefer using the existing streaming implementation for
 the simulation rather than a special DStream.


 Jianshi



 On Mon, Oct 27, 2014 at 4:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Yes, I understand what you want, but maybe hard to achieve without
 collecting back to driver node.



 Besides, can we just think of another way to do it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 4:07 PM

 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
 *Subject:* Re: RDD to DStream



 Yeah, you're absolutely right Saisai.



 My point is we should allow this kind of logic in RDD, let's say
 transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])].



 Make sense?



 Jianshi



 On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you want is to make each bucket as a new RDD as what you
 mentioned in Pig syntax.



 gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
 timestamp for each bucket



 From my understanding, currently in Spark there’s no such kind of API to
 achieve this, maybe you have to create a customized RDD by yourself.



 For the code why cannot executed,



   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...



 This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure
 to execute in remote side, which obviously do not has SparkContext, I think
 Spark cannot support nested RDD in closure.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 3:30 PM


 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
 *Subject:* Re: RDD to DStream



 Ok, back to Scala code, I'm wondering why I cannot do this:



 data.groupBy(timestamp / window)

   .sortByKey()  // no sort method available here

   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...

   .collect() // returns Seq[RDD[(Timestamp, T)]]





 Jianshi



 On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  You're absolutely right, it's not 'scalable' as I'm using collect().



 However, it's important to have the RDDs ordered by the timestamp of the
 time window (groupBy puts data to corresponding timewindow).



 It's fairly easy to do in Pig, but somehow I have no idea how to express
 it in RDD...



 Something like (in Pig, pseudo code :):



 g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
 the same time window

 gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
 timestamp for each bucket

 stream = FOREACH gs GENERATE toRDD(g)



 No idea how to do the order by part in RDD.



 Jianshi





 On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think you solution may not  be extendable if the data size is
 increasing, since you have to collect all your data back to driver node, so
 the memory usage of driver will be a problem.



 why not filter out specific time-range data as a rdd, after filtering the
 whole time-range, you will get a series of RDD with timestamp divided, and
 then feed into queue. Still it is not an efficient way, but  it is not
 limited by driver memory.



 Also there may have some other solutions like shuffle to arrange data,
 but you cannot avoid scanning the whole data. Basically we need to avoid
 fetching large amount of data back to driver.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 2:39 PM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)


 *Subject:* Re: RDD

Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext

2014-11-10 Thread Jianshi Huang
Hi Srinivas,

Here's the versions I'm using.

spark.version1.2.0-SNAPSHOT/spark.version
spray.version1.3.2/spray.version
spray.json.version1.3.0/spray.json.version
akka.grouporg.spark-project.akka/akka.group
akka.version2.3.4-spark/akka.version

I'm using Spark built from master. so it's 1.2.0-SNAPSHOT.

Jianshi



On Tue, Nov 11, 2014 at 4:06 AM, Srinivas Chamarthi 
srinivas.chamar...@gmail.com wrote:

 I am trying to use spark with spray and I have the  dependency problem
 with quasiquotes. The issue comes up only when I include spark
 dependencies. I am not sure how this one can be excluded.

 Jianshi: can you let me know what version of spray + akka + spark are you
 using ?

 [error]org.scalamacros:quasiquotes _2.10, _2.10.3
 [trace] Stack trace suppressed: run last *:update for the full output.
 [error] (*:update) Conflicting cross-version suffixes in:
 org.scalamacros:quasiq
 uotes


 On Thu, Oct 30, 2014 at 9:50 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi Preshant, Chester, Mohammed,

 I switched to Spark's Akka and now it works well. Thanks for the help!

 (Need to exclude Akka from Spray dependencies, or specify it as provided)


 Jianshi


 On Thu, Oct 30, 2014 at 3:17 AM, Mohammed Guller moham...@glassbeam.com
 wrote:

  I am not sure about that.



 Can you try a Spray version built with 2.2.x along with Spark 1.1 and
 include the Akka dependencies in your project’s sbt file?



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 8:58 PM
 *To:* Mohammed Guller
 *Cc:* user
 *Subject:* Re: Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4,
 right?



 Jianshi



 On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com
 wrote:

 Try a version built with Akka 2.2.x



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 3:03 AM
 *To:* user
 *Subject:* Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 Hi,



 I got the following exceptions when using Spray client to write to
 OpenTSDB using its REST API.



   Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext;



 It worked locally in my Intellij but failed when I launch it from
 Spark-submit.



 Google suggested it's a compatibility issue in Akka. And I'm using
 latest Spark built from the HEAD, so the Akka used in Spark-submit is
 2.3.4-spark.



 I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for
 2.3.4). Both failed with the same exception.



 Anyone has idea what went wrong? Need help!



 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext

2014-10-30 Thread Jianshi Huang
Hi Preshant, Chester, Mohammed,

I switched to Spark's Akka and now it works well. Thanks for the help!

(Need to exclude Akka from Spray dependencies, or specify it as provided)


Jianshi


On Thu, Oct 30, 2014 at 3:17 AM, Mohammed Guller moham...@glassbeam.com
wrote:

  I am not sure about that.



 Can you try a Spray version built with 2.2.x along with Spark 1.1 and
 include the Akka dependencies in your project’s sbt file?



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 8:58 PM
 *To:* Mohammed Guller
 *Cc:* user
 *Subject:* Re: Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4,
 right?



 Jianshi



 On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com
 wrote:

 Try a version built with Akka 2.2.x



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 3:03 AM
 *To:* user
 *Subject:* Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 Hi,



 I got the following exceptions when using Spray client to write to
 OpenTSDB using its REST API.



   Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext;



 It worked locally in my Intellij but failed when I launch it from
 Spark-submit.



 Google suggested it's a compatibility issue in Akka. And I'm using latest
 Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark.



 I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for
 2.3.4). Both failed with the same exception.



 Anyone has idea what went wrong? Need help!



 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/





 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext

2014-10-28 Thread Jianshi Huang
Hi,

I got the following exceptions when using Spray client to write to OpenTSDB
using its REST API.

  Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError:
akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext;

It worked locally in my Intellij but failed when I launch it from
Spark-submit.

Google suggested it's a compatibility issue in Akka. And I'm using latest
Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark.

I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for
2.3.4). Both failed with the same exception.

Anyone has idea what went wrong? Need help!

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext

2014-10-28 Thread Jianshi Huang
Here's the answer I got from Akka's user ML.


This looks like a binary incompatibility issue. As far as I know Spark is
using a custom built Akka and Scala for various reasons.

You should ask this on the Spark mailing list, Akka is binary compatible
between major versions (2.3.6 is compatible with 2.3.4) but Spark uses its
own build which I guess does not guarantee binary compatiblity.


*Anyone have used Spray or other Akka dependent libraries? Any binary
compatibility issues?*

Does that mean if I build Spray with Spark's Akka version

  akka.grouporg.spark-project.akka/akka.group
  akka.version2.3.4-spark/akka.version

it should solve problem. Makes sense? I'll give it a shot when I have time,
now probably I'll just not using Spray client...

Cheers,
Jianshi



On Tue, Oct 28, 2014 at 6:02 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I got the following exceptions when using Spray client to write to
 OpenTSDB using its REST API.

   Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext;

 It worked locally in my Intellij but failed when I launch it from
 Spark-submit.

 Google suggested it's a compatibility issue in Akka. And I'm using latest
 Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark.

 I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for
 2.3.4). Both failed with the same exception.

 Anyone has idea what went wrong? Need help!

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Spray client reports Exception: akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext

2014-10-28 Thread Jianshi Huang
I'm using Spark built from HEAD, I think it uses modified Akka 2.3.4, right?

Jianshi

On Wed, Oct 29, 2014 at 5:53 AM, Mohammed Guller moham...@glassbeam.com
wrote:

  Try a version built with Akka 2.2.x



 Mohammed



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Tuesday, October 28, 2014 3:03 AM
 *To:* user
 *Subject:* Spray client reports Exception:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext



 Hi,



 I got the following exceptions when using Spray client to write to
 OpenTSDB using its REST API.



   Exception in thread pool-10-thread-2 java.lang.NoSuchMethodError:
 akka.actor.ActorSystem.dispatcher()Lscala/concurrent/ExecutionContext;



 It worked locally in my Intellij but failed when I launch it from
 Spark-submit.



 Google suggested it's a compatibility issue in Akka. And I'm using latest
 Spark built from the HEAD, so the Akka used in Spark-submit is 2.3.4-spark.



 I tried both Spray 1.3.2 (built for Akka 2.3.6) and 1.3.1 (built for
 2.3.4). Both failed with the same exception.



 Anyone has idea what went wrong? Need help!



 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Spark streaming update/restart gracefully

2014-10-27 Thread Jianshi Huang
Looks like currently solution to update spark-stream jars/configurations is
to

1) save current Kafka offsets somewhere (say zookeeper)
2) shutdown the cluster and restart
3) connect to Kafka with previously saved offset

Assuming we're reading from Kafka which provides nice persistence and
read-from-offset feature.

Is there a better solution? Is it possible to reload jars dynamically?
Resize cluster size dynamically (SPARK-3174)?

A reasonable ask is to have a strategy to update or restart executors
gracefully. Something like https://issues.apache.org/jira/browse/STORM-167

Any suggestion?

-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline
data as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
ssc: StreamingContext): DStream[T] = {
val sc = ssc.sparkContext
val d = data.groupBy(_._1 / timeWindow)
.map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
.collect()
.map(e = (e._1, sc.parallelize(e._2)))
.sortBy(_._1)
val queue = new mutable.SynchronizedQueue[RDD[T]]

queue ++= d.map(_._2)

ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Jianshi,



 For simulation purpose, I think you can try ConstantInputDStream and
 QueueInputDStream to convert one RDD or series of RDD into DStream, the
 first one output the same RDD in each batch duration, and the second one
 just output a RDD in a queue in each batch duration. You can take a look at
 it.



 For your case, I think TD’s comment are quite meaningful, it’s not trivial
 to do so, often requires a job to scan all the records, it’s also not the
 design purpose of Spark Streaming, I guess it’s hard to achieve what you
 want.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 1:42 PM
 *To:* Tathagata Das
 *Cc:* Aniket Bhatnagar; user@spark.apache.org
 *Subject:* Re: RDD to DStream



 I have a similar requirement. But instead of grouping it by chunkSize, I
 would have the timeStamp be part of the data. So the function I want has
 the following signature:



   // RDD of (timestamp, value)

   def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit
 ssc: StreamingContext): DStream[T]



 And DStream should respect the timestamp part. This is important for
 simulation, right?



 Do you have any good solution for this?



 Jianshi





 On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

  Hey Aniket,



 Great thoughts! I understand the usecase. But as you have realized
 yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
 operations are defined to be scan based, it is not efficient to define RDD
 based on slices of data within a partition of another RDD, using pure RDD
 transformations. What you have done is a decent, and probably the only
 feasible solution, with its limitations.



 Also the requirements of converting a batch of data to a stream of data
 can be pretty diverse. What rate, what # of events per batch, how many
 batches, is it efficient? Hence, it is not trivial to define a good, clean
 public API for that. If any one has any thoughts, ideas, etc on this, you
 are more than welcome to share them.



 TD



 On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

  The use case for converting RDD into DStream is that I want to simulate
 a stream from an already persisted data for testing analytics. It is
 trivial to create a RDD from any persisted data but not so much for
 DStream. Therefore, my idea to create DStream from RDD. For example, lets
 say you are trying to implement analytics on time series data using Lambda
 architecture. This means you would have to implement the same analytics on
 streaming data (in streaming mode) as well as persisted data (in batch
 mode). The workflow for implementing the anlytics would be to first
 implement it in batch mode using RDD operations and then simulate stream to
 test the analytics in stream mode. The simulated stream should produce the
 elements at a specified rate. So the solution maybe to read data in a RDD,
 split (chunk) it into multiple RDDs with each RDD having the size of
 elements that need to be streamed per time unit and then finally stream
 each RDD using the compute function.



 The problem with using QueueInputDStream is that it will stream data as
 per the batch duration specified in the streaming context and one cannot
 specify a custom slide duration. Moreover, the class QueueInputDStream is
 private to streaming package, so I can't really use it/extend it from an
 external package. Also, I could not find a good solution split a RDD into
 equal sized smaller RDDs that can be fed into an extended version of
 QueueInputDStream.



 Finally, here is what I came up with:



 class RDDExtension[T: ClassTag](rdd: RDD[T]) {

   def toStream(streamingContext: StreamingContext, chunkSize: Int,
 slideDurationMilli: Option[Long] = None): DStream[T] = {

 new InputDStream[T](streamingContext) {



   private val iterator = rdd.toLocalIterator // WARNING: each
 partition much fit in RAM of local machine.

   private val grouped = iterator.grouped(chunkSize)



   override def start(): Unit

Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
You're absolutely right, it's not 'scalable' as I'm using collect().

However, it's important to have the RDDs ordered by the timestamp of the
time window (groupBy puts data to corresponding timewindow).

It's fairly easy to do in Pig, but somehow I have no idea how to express it
in RDD...

Something like (in Pig, pseudo code :):

g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
the same time window
gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
timestamp for each bucket
stream = FOREACH gs GENERATE toRDD(g)

No idea how to do the order by part in RDD.

Jianshi


On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com wrote:

  I think you solution may not  be extendable if the data size is
 increasing, since you have to collect all your data back to driver node, so
 the memory usage of driver will be a problem.



 why not filter out specific time-range data as a rdd, after filtering the
 whole time-range, you will get a series of RDD with timestamp divided, and
 then feed into queue. Still it is not an efficient way, but  it is not
 limited by driver memory.



 Also there may have some other solutions like shuffle to arrange data, but
 you cannot avoid scanning the whole data. Basically we need to avoid
 fetching large amount of data back to driver.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 2:39 PM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)

 *Subject:* Re: RDD to DStream



 Hi Saisai,



 I understand it's non-trivial, but the requirement of simulating offline
 data as stream is also fair. :)



 I just wrote a prototype, however, I need to do a collect and a bunch of
 parallelize...



   // RDD of (timestamp, value)

   def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
 ssc: StreamingContext): DStream[T] = {

 val sc = ssc.sparkContext

 val d = data.groupBy(_._1 / timeWindow)

 .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))

 .collect()

 .map(e = (e._1, sc.parallelize(e._2)))

 .sortBy(_._1)

 val queue = new mutable.SynchronizedQueue[RDD[T]]



 queue ++= d.map(_._2)



 ssc.queueStream(queue)

   }



 Any way to get a list of RDDs sorted by group key just after groupBy?



 Jianshi



 On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 For simulation purpose, I think you can try ConstantInputDStream and
 QueueInputDStream to convert one RDD or series of RDD into DStream, the
 first one output the same RDD in each batch duration, and the second one
 just output a RDD in a queue in each batch duration. You can take a look at
 it.



 For your case, I think TD’s comment are quite meaningful, it’s not trivial
 to do so, often requires a job to scan all the records, it’s also not the
 design purpose of Spark Streaming, I guess it’s hard to achieve what you
 want.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 1:42 PM
 *To:* Tathagata Das
 *Cc:* Aniket Bhatnagar; user@spark.apache.org
 *Subject:* Re: RDD to DStream



 I have a similar requirement. But instead of grouping it by chunkSize, I
 would have the timeStamp be part of the data. So the function I want has
 the following signature:



   // RDD of (timestamp, value)

   def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit
 ssc: StreamingContext): DStream[T]



 And DStream should respect the timestamp part. This is important for
 simulation, right?



 Do you have any good solution for this?



 Jianshi





 On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

  Hey Aniket,



 Great thoughts! I understand the usecase. But as you have realized
 yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
 operations are defined to be scan based, it is not efficient to define RDD
 based on slices of data within a partition of another RDD, using pure RDD
 transformations. What you have done is a decent, and probably the only
 feasible solution, with its limitations.



 Also the requirements of converting a batch of data to a stream of data
 can be pretty diverse. What rate, what # of events per batch, how many
 batches, is it efficient? Hence, it is not trivial to define a good, clean
 public API for that. If any one has any thoughts, ideas, etc on this, you
 are more than welcome to share them.



 TD



 On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

  The use case for converting RDD into DStream is that I want to simulate
 a stream from an already persisted data for testing analytics. It is
 trivial to create a RDD from any persisted data but not so much for
 DStream. Therefore, my idea to create DStream from RDD. For example, lets
 say you are trying

Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
Ok, back to Scala code, I'm wondering why I cannot do this:

data.groupBy(timestamp / window)
  .sortByKey()  // no sort method available here
  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
  .collect() // returns Seq[RDD[(Timestamp, T)]]


Jianshi

On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 You're absolutely right, it's not 'scalable' as I'm using collect().

 However, it's important to have the RDDs ordered by the timestamp of the
 time window (groupBy puts data to corresponding timewindow).

 It's fairly easy to do in Pig, but somehow I have no idea how to express
 it in RDD...

 Something like (in Pig, pseudo code :):

 g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
 the same time window
 gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
 timestamp for each bucket
 stream = FOREACH gs GENERATE toRDD(g)

 No idea how to do the order by part in RDD.

 Jianshi


 On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think you solution may not  be extendable if the data size is
 increasing, since you have to collect all your data back to driver node, so
 the memory usage of driver will be a problem.



 why not filter out specific time-range data as a rdd, after filtering the
 whole time-range, you will get a series of RDD with timestamp divided, and
 then feed into queue. Still it is not an efficient way, but  it is not
 limited by driver memory.



 Also there may have some other solutions like shuffle to arrange data,
 but you cannot avoid scanning the whole data. Basically we need to avoid
 fetching large amount of data back to driver.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 2:39 PM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)

 *Subject:* Re: RDD to DStream



 Hi Saisai,



 I understand it's non-trivial, but the requirement of simulating offline
 data as stream is also fair. :)



 I just wrote a prototype, however, I need to do a collect and a bunch of
 parallelize...



   // RDD of (timestamp, value)

   def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
 ssc: StreamingContext): DStream[T] = {

 val sc = ssc.sparkContext

 val d = data.groupBy(_._1 / timeWindow)

 .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))

 .collect()

 .map(e = (e._1, sc.parallelize(e._2)))

 .sortBy(_._1)

 val queue = new mutable.SynchronizedQueue[RDD[T]]



 queue ++= d.map(_._2)



 ssc.queueStream(queue)

   }



 Any way to get a list of RDDs sorted by group key just after groupBy?



 Jianshi



 On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 For simulation purpose, I think you can try ConstantInputDStream and
 QueueInputDStream to convert one RDD or series of RDD into DStream, the
 first one output the same RDD in each batch duration, and the second one
 just output a RDD in a queue in each batch duration. You can take a look at
 it.



 For your case, I think TD’s comment are quite meaningful, it’s not
 trivial to do so, often requires a job to scan all the records, it’s also
 not the design purpose of Spark Streaming, I guess it’s hard to achieve
 what you want.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 1:42 PM
 *To:* Tathagata Das
 *Cc:* Aniket Bhatnagar; user@spark.apache.org
 *Subject:* Re: RDD to DStream



 I have a similar requirement. But instead of grouping it by chunkSize, I
 would have the timeStamp be part of the data. So the function I want has
 the following signature:



   // RDD of (timestamp, value)

   def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit
 ssc: StreamingContext): DStream[T]



 And DStream should respect the timestamp part. This is important for
 simulation, right?



 Do you have any good solution for this?



 Jianshi





 On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

  Hey Aniket,



 Great thoughts! I understand the usecase. But as you have realized
 yourself it is not trivial to cleanly stream a RDD as a DStream. Since RDD
 operations are defined to be scan based, it is not efficient to define RDD
 based on slices of data within a partition of another RDD, using pure RDD
 transformations. What you have done is a decent, and probably the only
 feasible solution, with its limitations.



 Also the requirements of converting a batch of data to a stream of data
 can be pretty diverse. What rate, what # of events per batch, how many
 batches, is it efficient? Hence, it is not trivial to define a good, clean
 public API for that. If any one has any thoughts, ideas, etc on this, you
 are more than welcome to share them.



 TD



 On Mon, Aug 4, 2014 at 12:43 AM

Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
Yeah, you're absolutely right Saisai.

My point is we should allow this kind of logic in RDD, let's say
transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])].

Make sense?

Jianshi

On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai saisai.s...@intel.com wrote:

  I think what you want is to make each bucket as a new RDD as what you
 mentioned in Pig syntax.



 gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
 timestamp for each bucket



 From my understanding, currently in Spark there’s no such kind of API to
 achieve this, maybe you have to create a customized RDD by yourself.



 For the code why cannot executed,



   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...



 This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure
 to execute in remote side, which obviously do not has SparkContext, I think
 Spark cannot support nested RDD in closure.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 3:30 PM

 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
 *Subject:* Re: RDD to DStream



 Ok, back to Scala code, I'm wondering why I cannot do this:



 data.groupBy(timestamp / window)

   .sortByKey()  // no sort method available here

   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...

   .collect() // returns Seq[RDD[(Timestamp, T)]]





 Jianshi



 On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  You're absolutely right, it's not 'scalable' as I'm using collect().



 However, it's important to have the RDDs ordered by the timestamp of the
 time window (groupBy puts data to corresponding timewindow).



 It's fairly easy to do in Pig, but somehow I have no idea how to express
 it in RDD...



 Something like (in Pig, pseudo code :):



 g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
 the same time window

 gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
 timestamp for each bucket

 stream = FOREACH gs GENERATE toRDD(g)



 No idea how to do the order by part in RDD.



 Jianshi





 On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think you solution may not  be extendable if the data size is
 increasing, since you have to collect all your data back to driver node, so
 the memory usage of driver will be a problem.



 why not filter out specific time-range data as a rdd, after filtering the
 whole time-range, you will get a series of RDD with timestamp divided, and
 then feed into queue. Still it is not an efficient way, but  it is not
 limited by driver memory.



 Also there may have some other solutions like shuffle to arrange data, but
 you cannot avoid scanning the whole data. Basically we need to avoid
 fetching large amount of data back to driver.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 2:39 PM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)


 *Subject:* Re: RDD to DStream



 Hi Saisai,



 I understand it's non-trivial, but the requirement of simulating offline
 data as stream is also fair. :)



 I just wrote a prototype, however, I need to do a collect and a bunch of
 parallelize...



   // RDD of (timestamp, value)

   def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
 ssc: StreamingContext): DStream[T] = {

 val sc = ssc.sparkContext

 val d = data.groupBy(_._1 / timeWindow)

 .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))

 .collect()

 .map(e = (e._1, sc.parallelize(e._2)))

 .sortBy(_._1)

 val queue = new mutable.SynchronizedQueue[RDD[T]]



 queue ++= d.map(_._2)



 ssc.queueStream(queue)

   }



 Any way to get a list of RDDs sorted by group key just after groupBy?



 Jianshi



 On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 For simulation purpose, I think you can try ConstantInputDStream and
 QueueInputDStream to convert one RDD or series of RDD into DStream, the
 first one output the same RDD in each batch duration, and the second one
 just output a RDD in a queue in each batch duration. You can take a look at
 it.



 For your case, I think TD’s comment are quite meaningful, it’s not trivial
 to do so, often requires a job to scan all the records, it’s also not the
 design purpose of Spark Streaming, I guess it’s hard to achieve what you
 want.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 1:42 PM
 *To:* Tathagata Das
 *Cc:* Aniket Bhatnagar; user@spark.apache.org
 *Subject:* Re: RDD to DStream



 I have a similar requirement. But instead of grouping it by chunkSize, I
 would have the timeStamp be part of the data. So the function I want has
 the following

Ephemeral Hive metastore for HiveContext?

2014-10-27 Thread Jianshi Huang
There's an annoying small usability issue in HiveContext.

By default, it creates a local metastore which forbids other processes
using HiveContext to be launched from the same directory.

How can I make the metastore local to each HiveContext? Is there an
in-memory metastore configuration? /tmp/ temp folders is one solution,
but it's not elegant and I still need to clean up the files...

I can add hive-site.xml and use a shared metastore, however they'll still
operate in the same catalog space.

(Simple) SQLContext by default uses in-memory catalog which is bound to
each context. Since HiveContext is a subclass, we should make the same
semantics as default. Make sense?

Spark is very much functional and shared nothing, these are wonderful
features. Let's not have something global as a dependency.


Cheers,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: RDD to DStream

2014-10-27 Thread Jianshi Huang
Sure, let's still focus on the streaming simulation use case. It's a very
useful problem to solve.

If we're going to use the same Spark-streaming core for the simulation, the
most simple way is to have a globally sorted RDDs and use ssc.queueStream.
Thus collecting the Key part to driver is probably necessary.

I've done offline simulation in Pig and it's absolutely non-trivial and
error prone, also I had to have multiple copies of data due to overlaps of
window.

Therefore I would prefer using the existing streaming implementation for
the simulation rather than a special DStream.


Jianshi



On Mon, Oct 27, 2014 at 4:44 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Yes, I understand what you want, but maybe hard to achieve without
 collecting back to driver node.



 Besides, can we just think of another way to do it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 4:07 PM

 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
 *Subject:* Re: RDD to DStream



 Yeah, you're absolutely right Saisai.



 My point is we should allow this kind of logic in RDD, let's say
 transforming type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])].



 Make sense?



 Jianshi



 On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you want is to make each bucket as a new RDD as what you
 mentioned in Pig syntax.



 gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
 timestamp for each bucket



 From my understanding, currently in Spark there’s no such kind of API to
 achieve this, maybe you have to create a customized RDD by yourself.



 For the code why cannot executed,



   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...



 This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure
 to execute in remote side, which obviously do not has SparkContext, I think
 Spark cannot support nested RDD in closure.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 3:30 PM


 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)
 *Subject:* Re: RDD to DStream



 Ok, back to Scala code, I'm wondering why I cannot do this:



 data.groupBy(timestamp / window)

   .sortByKey()  // no sort method available here

   .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...

   .collect() // returns Seq[RDD[(Timestamp, T)]]





 Jianshi



 On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  You're absolutely right, it's not 'scalable' as I'm using collect().



 However, it's important to have the RDDs ordered by the timestamp of the
 time window (groupBy puts data to corresponding timewindow).



 It's fairly easy to do in Pig, but somehow I have no idea how to express
 it in RDD...



 Something like (in Pig, pseudo code :):



 g = GROUP data BY (timestamp / windowSize)  // group data into buckets in
 the same time window

 gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded
 timestamp for each bucket

 stream = FOREACH gs GENERATE toRDD(g)



 No idea how to do the order by part in RDD.



 Jianshi





 On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think you solution may not  be extendable if the data size is
 increasing, since you have to collect all your data back to driver node, so
 the memory usage of driver will be a problem.



 why not filter out specific time-range data as a rdd, after filtering the
 whole time-range, you will get a series of RDD with timestamp divided, and
 then feed into queue. Still it is not an efficient way, but  it is not
 limited by driver memory.



 Also there may have some other solutions like shuffle to arrange data, but
 you cannot avoid scanning the whole data. Basically we need to avoid
 fetching large amount of data back to driver.





 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Monday, October 27, 2014 2:39 PM
 *To:* Shao, Saisai
 *Cc:* user@spark.apache.org; Tathagata Das (t...@databricks.com)


 *Subject:* Re: RDD to DStream



 Hi Saisai,



 I understand it's non-trivial, but the requirement of simulating offline
 data as stream is also fair. :)



 I just wrote a prototype, however, I need to do a collect and a bunch of
 parallelize...



   // RDD of (timestamp, value)

   def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long,
 ssc: StreamingContext): DStream[T] = {

 val sc = ssc.sparkContext

 val d = data.groupBy(_._1 / timeWindow)

 .map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))

 .collect()

 .map(e = (e._1, sc.parallelize(e._2)))

 .sortBy(_._1)

 val queue = new mutable.SynchronizedQueue[RDD[T]]



 queue ++= d.map(_._2)



 ssc.queueStream(queue)

   }



 Any way

Re: Which is better? One spark app listening to 10 topics vs. 10 spark apps each listening to 1 topic

2014-10-27 Thread Jianshi Huang
Any suggestion? :)

Jianshi

On Thu, Oct 23, 2014 at 3:49 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 The Kafka stream has 10 topics and the data rate is quite high (~ 100K/s
 per topic).

 Which configuration do you recommend?
 - 1 Spark app consuming all Kafka topics
 - 10 separate Spark app each consuming one topic

 Assuming they have the same resource pool.

 Cheers,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Ephemeral Hive metastore for HiveContext?

2014-10-27 Thread Jianshi Huang
Thanks Ted and Cheng for the in memory derby solution. I'll check it out. :)

And to me, using in-mem by default makes sense, if user wants a shared
metastore, it needs to be specified. An 'embedded' local metastore in the
working directory barely has a use case.

Jianshi



On Mon, Oct 27, 2014 at 9:57 PM, Cheng Lian lian.cs@gmail.com wrote:

  Thanks Ted, this is exactly what Spark SQL LocalHiveContext does. To make
 an embedded metastore local to a single HiveContext, we must allocate
 different Derby database directories for each HiveContext, and Jianshi is
 also trying to avoid that.


 On 10/27/14 9:44 PM, Ted Yu wrote:

 Please see
 https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin#AdminManualMetastoreAdmin-EmbeddedMetastore

  Cheers

 On Oct 27, 2014, at 6:20 AM, Cheng Lian lian.cs@gmail.com wrote:

  I have never tried this yet, but maybe you can use an in-memory Derby
 database as metastore
 https://db.apache.org/derby/docs/10.7/devguide/cdevdvlpinmemdb.html

 I'll investigate this when free, guess we can use this for Spark SQL Hive
 support testing.

 On 10/27/14 4:38 PM, Jianshi Huang wrote:

 There's an annoying small usability issue in HiveContext.


  By default, it creates a local metastore which forbids other processes
 using HiveContext to be launched from the same directory.


  How can I make the metastore local to each HiveContext? Is there an
 in-memory metastore configuration? /tmp/ temp folders is one solution,
 but it's not elegant and I still need to clean up the files...


  I can add hive-site.xml and use a shared metastore, however they'll
 still operate in the same catalog space.


  (Simple) SQLContext by default uses in-memory catalog which is bound to
 each context. Since HiveContext is a subclass, we should make the same
 semantics as default. Make sense?


  Spark is very much functional and shared nothing, these are wonderful
 features. Let's not have something global as a dependency.



  Cheers,

 --

 Jianshi Huang


  LinkedIn: jianshi

 Twitter: @jshuang

 Github  Blog: http://huangjs.github.com/



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: RDD to DStream

2014-10-26 Thread Jianshi Huang
 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Sometimes it is useful to convert a RDD into a DStream for testing
 purposes (generating DStreams from historical data, etc). Is there an easy
 way to do this?

 I could come up with the following inefficient way but no sure if
 there is a better way to achieve this. Thoughts?

 class RDDExtension[T](rdd: RDD[T]) {

   def chunked(chunkSize: Int): RDD[Seq[T]] = {
 rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize))
   }

   def skipFirst(): RDD[T] = {
 rdd.zipWithIndex().filter(tuple = tuple._2  0).map(_._1)
   }

   def toStream(streamingContext: StreamingContext, chunkSize: Int,
 slideDurationMilli: Option[Long] = None): DStream[T] = {
 new InputDStream[T](streamingContext) {

   @volatile private var currentRDD: RDD[Seq[T]] =
 rdd.chunked(chunkSize)

   override def start(): Unit = {}

   override def stop(): Unit = {}

   override def compute(validTime: Time): Option[RDD[T]] = {
 val chunk = currentRDD.take(1)
 currentRDD = currentRDD.skipFirst()
 Some(rdd.sparkContext.parallelize(chunk))
   }

   override def slideDuration = {
 slideDurationMilli.map(duration = new Duration(duration)).
   getOrElse(super.slideDuration)
   }
 }

 }








-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Dynamically loaded Spark-stream consumer

2014-10-23 Thread Jianshi Huang
I have a use case that I need to continuously ingest data from Kafka
stream. However apart from ingestion (to HBase), I also need to compute
some metrics (i.e. avg for last min, etc.).

The problem is that it's very likely I'll continuously add more metrics and
I don't want to restart my spark program from time to time.

Is there a mechanism that Spark stream can load and plugin code in runtime
without restarting?

Any solutions or suggestions?

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Which is better? One spark app listening to 10 topics vs. 10 spark apps each listening to 1 topic

2014-10-23 Thread Jianshi Huang
The Kafka stream has 10 topics and the data rate is quite high (~ 100K/s
per topic).

Which configuration do you recommend?
- 1 Spark app consuming all Kafka topics
- 10 separate Spark app each consuming one topic

Assuming they have the same resource pool.

Cheers,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Multitenancy in Spark - within/across spark context

2014-10-23 Thread Jianshi Huang
Upvote for the multitanency requirement.

I'm also building a data analytic platform and there'll be multiple users
running queries and computations simultaneously. One of the paint point is
control of resource size. Users don't really know how much nodes they need,
they always use as much as possible... The result is lots of wasted
resource in our Yarn cluster.

A way to 1) allow multiple spark context to share the same resource or 2)
add dynamic resource management for Yarn mode is very much wanted.

Jianshi

On Thu, Oct 23, 2014 at 5:36 AM, Marcelo Vanzin van...@cloudera.com wrote:

 On Wed, Oct 22, 2014 at 2:17 PM, Ashwin Shankar
 ashwinshanka...@gmail.com wrote:
  That's not something you might want to do usually. In general, a
  SparkContext maps to a user application
 
  My question was basically this. In this page in the official doc, under
  Scheduling within an application section, it talks about multiuser and
  fair sharing within an app. How does multiuser within an application
  work(how users connect to an app,run their stuff) ? When would I want to
 use
  this ?

 I see. The way I read that page is that Spark supports all those
 scheduling options; but Spark doesn't give you the means to actually
 be able to submit jobs from different users to a running SparkContext
 hosted on a different process. For that, you'll need something like
 the job server that I referenced before, or write your own framework
 for supporting that.

 Personally, I'd use the information on that page when dealing with
 concurrent jobs in the same SparkContext, but still restricted to the
 same user. I'd avoid trying to create any application where a single
 SparkContext is trying to be shared by multiple users in any way.

  As far as I understand, this will cause executors to be killed, which
  means that Spark will start retrying tasks to rebuild the data that
  was held by those executors when needed.
 
  I basically wanted to find out if there were any gotchas related to
  preemption on Spark. Things like say half of an application's executors
 got
  preempted say while doing reduceByKey, will the application progress with
  the remaining resources/fair share ?

 Jobs should still make progress as long as at least one executor is
 available. The gotcha would be the one I mentioned, where Spark will
 fail your job after x executors failed, which might be a common
 occurrence when preemption is enabled. That being said, it's a
 configurable option, so you can set x to a very large value and your
 job should keep on chugging along.

 The options you'd want to take a look at are: spark.task.maxFailures
 and spark.yarn.max.executor.failures

 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


SPARK-3106 fixed?

2014-10-13 Thread Jianshi Huang
https://issues.apache.org/jira/browse/SPARK-3106

I'm having the saming errors described in SPARK-3106 (no other types of
errors confirmed), running a bunch sql queries on spark 1.2.0 built from
latest master HEAD.

Any updates to this issue?

My main task is to join a huge fact table with a dozen dim tables (using
HiveContext) and then map it to my class object. It failed a couple of
times and now I cached the intermediate table and currently it seems
working fine... no idea why until I found SPARK-3106

Cheers,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: SPARK-3106 fixed?

2014-10-13 Thread Jianshi Huang
Hmm... it failed again, just lasted a little bit longer.

Jianshi

On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 https://issues.apache.org/jira/browse/SPARK-3106

 I'm having the saming errors described in SPARK-3106 (no other types of
 errors confirmed), running a bunch sql queries on spark 1.2.0 built from
 latest master HEAD.

 Any updates to this issue?

 My main task is to join a huge fact table with a dozen dim tables (using
 HiveContext) and then map it to my class object. It failed a couple of
 times and now I cached the intermediate table and currently it seems
 working fine... no idea why until I found SPARK-3106

 Cheers,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: SPARK-3106 fixed?

2014-10-13 Thread Jianshi Huang
Turned out it was caused by this issue:
https://issues.apache.org/jira/browse/SPARK-3923

Set spark.akka.heartbeat.interval to 100 solved it.

Jianshi

On Mon, Oct 13, 2014 at 4:24 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hmm... it failed again, just lasted a little bit longer.

 Jianshi

 On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 https://issues.apache.org/jira/browse/SPARK-3106

 I'm having the saming errors described in SPARK-3106 (no other types of
 errors confirmed), running a bunch sql queries on spark 1.2.0 built from
 latest master HEAD.

 Any updates to this issue?

 My main task is to join a huge fact table with a dozen dim tables (using
 HiveContext) and then map it to my class object. It failed a couple of
 times and now I cached the intermediate table and currently it seems
 working fine... no idea why until I found SPARK-3106

 Cheers,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: SPARK-3106 fixed?

2014-10-13 Thread Jianshi Huang
One thing made me very confused during debuggin is the error message. The
important one

  WARN ReliableDeliverySupervisor: Association with remote system
[akka.tcp://sparkDriver@xxx:50278] has failed, address is now gated for
[5000] ms. Reason is: [Disassociated].

is of Log Level WARN.

Jianshi


On Tue, Oct 14, 2014 at 4:36 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Turned out it was caused by this issue:
 https://issues.apache.org/jira/browse/SPARK-3923

 Set spark.akka.heartbeat.interval to 100 solved it.

 Jianshi

 On Mon, Oct 13, 2014 at 4:24 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hmm... it failed again, just lasted a little bit longer.

 Jianshi

 On Mon, Oct 13, 2014 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 https://issues.apache.org/jira/browse/SPARK-3106

 I'm having the saming errors described in SPARK-3106 (no other types of
 errors confirmed), running a bunch sql queries on spark 1.2.0 built from
 latest master HEAD.

 Any updates to this issue?

 My main task is to join a huge fact table with a dozen dim tables (using
 HiveContext) and then map it to my class object. It failed a couple of
 times and now I cached the intermediate table and currently it seems
 working fine... no idea why until I found SPARK-3106

 Cheers,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


  1   2   >