Re: Does it has a way to config limit in query on STS by default?

2016-08-30 Thread Chen Song
I tried both of the following with STS but neither works for me.

Starting STS with --hiveconf hive.limit.optimize.fetch.max=50

and

Setting common.max_count in Zeppelin

Without setting such limits, a query that outputs lots of rows could cause
the driver to OOM and makes TS unusable. Any workarounds or thoughts?


On Tue, Aug 2, 2016 at 7:29 AM Mich Talebzadeh 
wrote:

> I don't think it really works and it is vague. Is it rows, blocks, network?
>
>
>
> [image: Inline images 1]
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 2 August 2016 at 12:09, Chanh Le  wrote:
>
>> Hi Ayan,
>> You mean
>> common.max_count = 1000
>> Max number of SQL result to *display to prevent the browser overload*.
>> This is common properties for all connections
>>
>>
>>
>>
>> It already set default in Zeppelin but I think it doesn’t work with Hive.
>>
>>
>> DOC: http://zeppelin.apache.org/docs/0.7.0-SNAPSHOT/interpreter/jdbc.html
>>
>>
>> On Aug 2, 2016, at 6:03 PM, ayan guha  wrote:
>>
>> Zeppelin already has a param for jdbc
>> On 2 Aug 2016 19:50, "Mich Talebzadeh"  wrote:
>>
>>> Ok I have already set up mine
>>>
>>> 
>>> hive.limit.optimize.fetch.max
>>> 5
>>> 
>>>   Maximum number of rows allowed for a smaller subset of data for
>>> simple LIMIT, if it is a fetch query.
>>>   Insert queries are not restricted by this limit.
>>> 
>>>   
>>>
>>> I am surprised that yours was missing. What did you set it up to?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 2 August 2016 at 10:18, Chanh Le  wrote:
>>>
 I tried and it works perfectly.

 Regards,
 Chanh


 On Aug 2, 2016, at 3:33 PM, Mich Talebzadeh 
 wrote:

 OK

 Try that

 Another tedious way is to create views in Hive based on tables and use
 limit on those views.

 But try that parameter first if it does anything.

 HTH


 Dr Mich Talebzadeh


 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *


 http://talebzadehmich.wordpress.com

 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 2 August 2016 at 09:13, Chanh Le  wrote:

> Hi Mich,
> I use Spark Thrift Server basically it acts like Hive.
>
> I see that there is property in Hive.
>
> hive.limit.optimize.fetch.max
>
>- Default Value: 5
>- Added In: Hive 0.8.0
>
> Maximum number of rows allowed for a smaller subset of data for simple
> LIMIT, if it is a fetch query. Insert queries are not restricted by this
> limit.
>
>
> Is that related to the problem?
>
>
>
>
> On Aug 2, 2016, at 2:55 PM, Mich Talebzadeh 
> wrote:
>
> This is a classic problem on any RDBMS
>
> Set the limit on the number of rows returned like maximum of 50K rows
> through JDBC
>
> What is your JDBC connection going to? Meaning which RDBMS if any?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> 

Re: spark classloader question

2016-07-07 Thread Chen Song
Thanks Prajwal.

I tried these options and they make no difference.

On Thu, Jul 7, 2016 at 12:20 PM Prajwal Tuladhar <p...@infynyxx.com> wrote:

> You can try to play with experimental flags [1] 
> `spark.executor.userClassPathFirst`
> and `spark.driver.userClassPathFirst`. But this can also potentially
> break other things (like: dependencies that Spark master required
> initializing overridden by Spark app and so on) so, you will need to verify.
>
> [1] https://spark.apache.org/docs/latest/configuration.html
>
> On Thu, Jul 7, 2016 at 4:05 PM, Chen Song <chen.song...@gmail.com> wrote:
>
>> Sorry to spam people who are not interested. Greatly appreciate it if
>> anyone who is familiar with this can share some insights.
>>
>> On Wed, Jul 6, 2016 at 2:28 PM Chen Song <chen.song...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I ran into problems to use class loader in Spark. In my code (run within
>>> executor), I explicitly load classes using the ContextClassLoader as below.
>>>
>>> Thread.currentThread().getContextClassLoader()
>>>
>>> The jar containing the classes to be loaded is added via the --jars
>>> option in spark-shell/spark-submit.
>>>
>>> I always get the class not found exception. However, it seems to work if
>>> I compile these classes in main jar for the job (the jar containing the
>>> main job class).
>>>
>>> I know Spark implements its own class loaders in a particular way. Is
>>> there a way to work around this? In other words, what is the proper way to
>>> programmatically load classes in other jars added via --jars in Spark?
>>>
>>>
>
>
> --
> --
> Cheers,
> Praj
>


Re: spark classloader question

2016-07-07 Thread Chen Song
Thanks Marco

The code snippet has something like below.

ClassLoader cl = Thread.currentThread().getContextClassLoader();
String packagePath = "com.xxx.xxx";
final Enumeration resources = cl.getResources(packagePath);

So resources collection is always empty, indicating no classes are loaded.

As I mentioned in my original email, it works when I include those classes
in the fat jar. Our use case is that each team will create their own jar
including their own protobuf schema classes. I cannot really create a fat
jar including every class in every environment.

Chen


On Thu, Jul 7, 2016 at 12:18 PM Marco Mistroni <mmistr...@gmail.com> wrote:

> Hi Chen
>  pls post
> 1 . snippet code
> 2. exception
>
> any particular reason why you need to load classes in other jars
> programmatically?
>
> Have you tried to build a fat jar with all the dependencies ?
>
> hth
> marco
>
> On Thu, Jul 7, 2016 at 5:05 PM, Chen Song <chen.song...@gmail.com> wrote:
>
>> Sorry to spam people who are not interested. Greatly appreciate it if
>> anyone who is familiar with this can share some insights.
>>
>> On Wed, Jul 6, 2016 at 2:28 PM Chen Song <chen.song...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I ran into problems to use class loader in Spark. In my code (run within
>>> executor), I explicitly load classes using the ContextClassLoader as below.
>>>
>>> Thread.currentThread().getContextClassLoader()
>>>
>>> The jar containing the classes to be loaded is added via the --jars
>>> option in spark-shell/spark-submit.
>>>
>>> I always get the class not found exception. However, it seems to work if
>>> I compile these classes in main jar for the job (the jar containing the
>>> main job class).
>>>
>>> I know Spark implements its own class loaders in a particular way. Is
>>> there a way to work around this? In other words, what is the proper way to
>>> programmatically load classes in other jars added via --jars in Spark?
>>>
>>>
>


Re: spark classloader question

2016-07-07 Thread Chen Song
Sorry to spam people who are not interested. Greatly appreciate it if
anyone who is familiar with this can share some insights.

On Wed, Jul 6, 2016 at 2:28 PM Chen Song <chen.song...@gmail.com> wrote:

> Hi
>
> I ran into problems to use class loader in Spark. In my code (run within
> executor), I explicitly load classes using the ContextClassLoader as below.
>
> Thread.currentThread().getContextClassLoader()
>
> The jar containing the classes to be loaded is added via the --jars option
> in spark-shell/spark-submit.
>
> I always get the class not found exception. However, it seems to work if I
> compile these classes in main jar for the job (the jar containing the main
> job class).
>
> I know Spark implements its own class loaders in a particular way. Is
> there a way to work around this? In other words, what is the proper way to
> programmatically load classes in other jars added via --jars in Spark?
>
>


spark classloader question

2016-07-06 Thread Chen Song
Hi

I ran into problems to use class loader in Spark. In my code (run within
executor), I explicitly load classes using the ContextClassLoader as below.

Thread.currentThread().getContextClassLoader()

The jar containing the classes to be loaded is added via the --jars option
in spark-shell/spark-submit.

I always get the class not found exception. However, it seems to work if I
compile these classes in main jar for the job (the jar containing the main
job class).

I know Spark implements its own class loaders in a particular way. Is there
a way to work around this? In other words, what is the proper way to
programmatically load classes in other jars added via --jars in Spark?


Access batch statistics in Spark Streaming

2016-02-08 Thread Chen Song
Apologize in advance if someone has already asked and addressed this
question.

In Spark Streaming, how can I programmatically get the batch statistics
like schedule delay, total delay and processing time (They are shown in the
job UI streaming tab)? I need such information to raise alerts in some
circumstances. For example, if the scheduling is delayed more than a
threshold.

Thanks,
Chen


question on spark.streaming.kafka.maxRetries

2016-02-02 Thread Chen Song
For Kafka direct stream, is there a way to set the time between successive
retries? From my testing, it looks like it is 200ms. Any way I can increase
the time?


Re: kerberos question

2015-11-04 Thread Chen Song
After a bit more investigation, I found that it could be related to
impersonation on kerberized cluster.

Our job is started with the following command.

/usr/lib/spark/bin/spark-submit --master yarn-client --principal
[principle] --keytab [keytab] --proxy-user [proxied_user] ...


In application master's log,

At start up,

2015-11-03 16:03:41,602 INFO  [main] yarn.AMDelegationTokenRenewer
(Logging.scala:logInfo(59)) - Scheduling login from keytab in 64789744
millis.

Later on, when the delegation token renewer thread kicks in, it tries to
re-login with the specified principle with new credentials and tries to
write the new credentials into the over to the directory where the current
user's credentials are stored. However, with impersonation, because the
current user is a different user from the principle user, it fails with
permission error.

2015-11-04 10:03:31,366 INFO  [Delegation Token Refresh Thread-0]
yarn.AMDelegationTokenRenewer (Logging.scala:logInfo(59)) - Attempting
to login to KDC using principal: principal/host@domain
2015-11-04 10:03:31,665 INFO  [Delegation Token Refresh Thread-0]
yarn.AMDelegationTokenRenewer (Logging.scala:logInfo(59)) -
Successfully logged into KDC.
2015-11-04 10:03:31,702 INFO  [Delegation Token Refresh Thread-0]
yarn.YarnSparkHadoopUtil (Logging.scala:logInfo(59)) - getting token
for namenode: 
hdfs://hadoop_abc/user/proxied_user/.sparkStaging/application_1443481003186_0
2015-11-04 10:03:31,904 INFO  [Delegation Token Refresh Thread-0]
hdfs.DFSClient (DFSClient.java:getDelegationToken(1025)) - Created
HDFS_DELEGATION_TOKEN token 389283 for principal on ha-hdfs:hadoop_abc
2015-11-04 10:03:31,905 ERROR [Delegation Token Refresh Thread-0]
hdfs.KeyProviderCache (KeyProviderCache.java:createKeyProviderURI(87))
- Could not find uri with key [dfs.encryption.key.provider.uri] to
create a keyProvider !!
2015-11-04 10:03:31,944 WARN  [Delegation Token Refresh Thread-0]
security.UserGroupInformation (UserGroupInformation.java:doAs(1674)) -
PriviledgedActionException as:proxy-user (auth:SIMPLE)
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby
2015-11-04 10:03:31,945 WARN  [Delegation Token Refresh Thread-0]
ipc.Client (Client.java:run(675)) - Exception encountered while
connecting to the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby
2015-11-04 10:03:31,945 WARN  [Delegation Token Refresh Thread-0]
security.UserGroupInformation (UserGroupInformation.java:doAs(1674)) -
PriviledgedActionException as:proxy-user (auth:SIMPLE)
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby
2015-11-04 10:03:31,963 WARN  [Delegation Token Refresh Thread-0]
yarn.YarnSparkHadoopUtil (Logging.scala:logWarning(92)) - Error while
attempting to list files from application staging dir
org.apache.hadoop.security.AccessControlException: Permission denied:
user=principal, access=READ_EXECUTE,
inode="/user/proxy-user/.sparkStaging/application_1443481003186_0":proxy-user:proxy-user:drwx--


Can someone confirm my understanding is right? The class relevant is below,
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala

Chen

On Tue, Nov 3, 2015 at 11:57 AM, Chen Song <chen.song...@gmail.com> wrote:

> We saw the following error happening in Spark Streaming job. Our job is
> running on YARN with kerberos enabled.
>
> First, warnings below were printed out, I only pasted a few but the
> following was repeated hundred/thousand of times.
>
> 15/11/03 14:43:07 WARN UserGroupInformation: PriviledgedActionException
> as:[kerberos principle] (auth:KERBEROS)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category READ is not supported in state standby
> 15/11/03 14:43:07 WARN Client: Exception encountered while connecting to
> the server :
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category READ is not supported in state standby
> 15/11/03 14:43:07 WARN UserGroupInformation: PriviledgedActionException
> as:[kerberos principle] (auth:KERBEROS)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category READ is not supported in state standby
> 15/11/03 14:43:07 WARN UserGroupInformation: PriviledgedActionException
> as:[kerberos principle] (auth:KERBEROS)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category READ is not supported in state standby
> 15/11/03 14:43:07 WARN Client: Exception encountered while connecting to
> the server :
> org.apache.hadoop.ipc.RemoteEx

kerberos question

2015-11-03 Thread Chen Song
We saw the following error happening in Spark Streaming job. Our job is
running on YARN with kerberos enabled.

First, warnings below were printed out, I only pasted a few but the
following was repeated hundred/thousand of times.

15/11/03 14:43:07 WARN UserGroupInformation: PriviledgedActionException
as:[kerberos principle] (auth:KERBEROS)
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby
15/11/03 14:43:07 WARN Client: Exception encountered while connecting to
the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby
15/11/03 14:43:07 WARN UserGroupInformation: PriviledgedActionException
as:[kerberos principle] (auth:KERBEROS)
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby
15/11/03 14:43:07 WARN UserGroupInformation: PriviledgedActionException
as:[kerberos principle] (auth:KERBEROS)
cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby
15/11/03 14:43:07 WARN Client: Exception encountered while connecting to
the server :
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby


It seems to have something to do with renewal of token and it tried to
connect a standby namenode.

Then the following error was thrown out.

15/11/03 14:43:20 ERROR Utils: Uncaught exception in thread Delegation
Token Refresh Thread-0
java.lang.StackOverflowError
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:89)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1.run(ExecutorDelegationTokenUpdater.scala:49)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:79)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1.run(ExecutorDelegationTokenUpdater.scala:49)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater.updateCredentialsIfRequired(ExecutorDelegationTokenUpdater.scala:79)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorDelegationTokenUpdater.scala:49)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
at
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater$$anon$1$$anonfun$run$1.apply(ExecutorDelegationTokenUpdater.scala:49)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)


Again, the above stacktrace was repeated hundreds/throusands of times. That
explains why a stackoverflow exception was produced.

My question is:

* If the HDFS active name node failed over during the job, the next time
token renewal is needed, the client would always need to connect with the
same namenode when the token was created. Is that true and expected? If so,
how to handle failover of namenodes for a streaming job in Spark.

Thanks for your feedback in advance.

-- 
Chen Song


Re: question on make multiple external calls within each partition

2015-10-07 Thread Chen Song
Thanks TD and Ashish.

On Mon, Oct 5, 2015 at 9:14 PM, Tathagata Das <t...@databricks.com> wrote:

> You could create a threadpool on demand within the foreachPartitoin
> function, then handoff the REST calls to that threadpool, get back the
> futures and wait for them to finish. Should be pretty straightforward. Make
> sure that your foreachPartition function cleans up the threadpool before
> finishing. Alternatively, you can create an on-demand singleton threadpool
> that is reused across batches, will reduce the cost of creating threadpools
> everytime.
>
> On Mon, Oct 5, 2015 at 6:07 PM, Ashish Soni <asoni.le...@gmail.com> wrote:
>
>> Need more details but you might want to filter the data first ( create
>> multiple RDD) and then process.
>>
>>
>> > On Oct 5, 2015, at 8:35 PM, Chen Song <chen.song...@gmail.com> wrote:
>> >
>> > We have a use case with the following design in Spark Streaming.
>> >
>> > Within each batch,
>> > * data is read and partitioned by some key
>> > * forEachPartition is used to process the entire partition
>> > * within each partition, there are several REST clients created to
>> connect to different REST services
>> > * for the list of records within each partition, it will call these
>> services, each service call is independent of others; records are just
>> pre-partitioned to make these calls more efficiently.
>> >
>> > I have a question
>> > * Since each call is time taking and to prevent the calls to be
>> executed sequentially, how can I parallelize the service calls within
>> processing of each partition? Can I just use Scala future within
>> forEachPartition(or mapPartitions)?
>> >
>> > Any suggestions greatly appreciated.
>> >
>> > Chen
>> >
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Chen Song


question on make multiple external calls within each partition

2015-10-05 Thread Chen Song
We have a use case with the following design in Spark Streaming.

Within each batch,
* data is read and partitioned by some key
* forEachPartition is used to process the entire partition
* within each partition, there are several REST clients created to connect
to different REST services
* for the list of records within each partition, it will call these
services, each service call is independent of others; records are just
pre-partitioned to make these calls more efficiently.

I have a question
* Since each call is time taking and to prevent the calls to be executed
sequentially, how can I parallelize the service calls within processing of
each partition? Can I just use Scala future within forEachPartition(or
mapPartitions)?

Any suggestions greatly appreciated.

Chen


Re: Notification on Spark Streaming job failure

2015-09-28 Thread Chen Song
I am also interested specifically in monitoring and alerting on Spark
streaming jobs. It will be helpful to get some general guidelines or advice
on this, from people who implemented anything on this.

On Fri, Sep 18, 2015 at 2:35 AM, Krzysztof Zarzycki <k.zarzy...@gmail.com>
wrote:

> Hi there Spark Community,
> I would like to ask you for an advice: I'm running Spark Streaming jobs in
> production. Sometimes these jobs fail and I would like to get email
> notification about it. Do you know how I can set up Spark to notify me by
> email if my job fails? Or do I have to use external monitoring tool?
> I'm thinking of the following options:
> 1. As I'm running those jobs on YARN, monitor somehow YARN jobs. Looked
> for it as well but couldn't find any YARN feature to do it.
> 2. Run Spark Streaming job in some scheduler, like Oozie, Azkaban, Luigi.
> Those are created rather for batch jobs, not streaming, but could work. Has
> anyone tried that?
> 3. Run job driver under "monit" tool and catch the failure and send an
> email about it. Currently I'm deploying with yarn-cluster mode and I would
> need to resign from it to run under monit
> 4. Implement monitoring tool (like Graphite, Ganglia, Prometheus) and use
> Spark metrics. And then implement alerting in those. Can I get information
> of failed jobs in Spark metrics?
> 5. As 4. but implement my own custom job metrics and monitor them.
>
> What's your opinion about my options? How do you people solve this
> problem? Anything Spark specific?
> I'll be grateful for any advice in this subject.
> Thanks!
> Krzysiek
>
>


-- 
Chen Song


Kafka Direct Stream join without data shuffle

2015-09-02 Thread Chen Song
I have a stream got from Kafka with direct approach, say, inputStream, I
need to

1. Create another DStream derivedStream with map or mapPartitions (with
some data enrichment with reference table) on inputStream
2. Join derivedStream with inputStream

In my use case, I don't need to shuffle data. Each partition in
derivedStream only needs to be joined with the corresponding partition in
the original parent inputStream it is generated from.

My question is

1. Is there a Partitioner defined in KafkaRDD at all?
2. How would I preserve the partitioning scheme and avoid data shuffle?

-- 
Chen Song


Re: application logs for long lived job on YARN

2015-08-27 Thread Chen Song
Anyone has similar problem or thoughts on this?

On Wed, Aug 26, 2015 at 10:37 AM, Chen Song chen.song...@gmail.com wrote:

 When running long-lived job on YARN like Spark Streaming, I found that
 container logs gone after days on executor nodes, although the job itself
 is still running.


 I am using cdh5.4.0 and have aggregated logs enabled. Because the local
 logs are gone on executor nodes, I don't see any aggregated logs on hdfs
 after the job is killed or failed.

 Is there a YARN config to keep the logs from being deleted for long-lived
 streaming job?

 --
 Chen Song




-- 
Chen Song


application logs for long lived job on YARN

2015-08-26 Thread Chen Song
When running long-lived job on YARN like Spark Streaming, I found that
container logs gone after days on executor nodes, although the job itself
is still running.


I am using cdh5.4.0 and have aggregated logs enabled. Because the local
logs are gone on executor nodes, I don't see any aggregated logs on hdfs
after the job is killed or failed.

Is there a YARN config to keep the logs from being deleted for long-lived
streaming job?

-- 
Chen Song


Re: JDBC Streams

2015-08-26 Thread Chen Song
Thanks Cody.

Are you suggesting to put the cache in global context in each executor JVM,
in a Scala object for example. Then have a scheduled task to refresh the
cache (or triggered by the expiry if Guava)?

Chen

On Wed, Aug 26, 2015 at 10:51 AM, Cody Koeninger c...@koeninger.org wrote:

 If your data only changes every few days, why not restart the job every
 few days, and just broadcast the data?

 Or you can keep a local per-jvm cache with an expiry (e.g. guava cache) to
 avoid many mysql reads

 On Wed, Aug 26, 2015 at 9:46 AM, Chen Song chen.song...@gmail.com wrote:

 Piggyback on this question.

 I have a similar use case but a bit different. My job is consuming a
 stream from Kafka and I need to join the Kafka stream with some reference
 table from MySQL (kind of data validation and enrichment). I need to
 process this stream every 1 min. The data in MySQL is not changed very
 often, maybe once a few days.

 So my requirement is:

 * I cannot easily use broadcast variable because the data does change,
 although not very often.
 * I am not sure if it is good practice to read data from MySQL in every
 batch (in my case, 1 min).

 Anyone has done this before, any suggestions and feedback is appreciated.

 Chen


 On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab as...@live.com wrote:

 If it is indeed a reactive use case, then Spark Streaming would be a
 good choice.

 One approach worth considering - is it possible to receive a message via
 kafka (or some other queue). That'd not need any polling, and you could use
 standard consumers. If polling isn't an issue, then writing a custom
 receiver will work fine. The way a receiver works is this:

 * Your receiver has a receive() function, where you'd typically start a
 loop. In your loop, you'd fetch items, and call store(entry).
 * You control everything in the receiver. If you're listening on a
 queue, you receive messages, store() and ack your queue. If you're polling,
 it's up to you to ensure delays between db calls.
 * The things you store() go on to make up the rdds in your DStream. So,
 intervals, windowing, etc. apply to those. The receiver is the boundary
 between your data source and the DStream RDDs. In other words, if your
 interval is 15 seconds with no windowing, then the things that went to
 store() every 15 seconds are bunched up into an RDD of your DStream. That's
 kind of a simplification, but should give you the idea that your db
 polling interval and streaming interval are not tied together.

 -Ashic.

 --
 Date: Mon, 6 Jul 2015 01:12:34 +1000
 Subject: Re: JDBC Streams
 From: guha.a...@gmail.com
 To: as...@live.com
 CC: ak...@sigmoidanalytics.com; user@spark.apache.org


 Hi

 Thanks for the reply. here is my situation: I hve a DB which enbles
 synchronus CDC, think this as a DBtrigger which writes to a taable with
 changed values as soon as something changes in production table. My job
 will need to pick up the data as soon as it arrives which can be every 1
 min interval. Ideally it will pick up the changes, transform it into a
 jsonand puts it to kinesis. In short, I am emulating a Kinesis producer
 with a DB source (dont even ask why, lets say these are the constraints :) )

 Please advice (a) is spark a good choice here (b)  whats your suggestion
 either way.

 I understand I can easily do it using a simple java/python app but I am
 little worried about managing scaling/fault tolerance and thats where my
 concern is.

 TIA
 Ayan

 On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab as...@live.com wrote:

 Hi Ayan,
 How continuous is your workload? As Akhil points out, with streaming,
 you'll give up at least one core for receiving, will need at most one more
 core for processing. Unless you're running on something like Mesos, this
 means that those cores are dedicated to your app, and can't be leveraged by
 other apps / jobs.

 If it's something periodic (once an hour, once every 15 minutes, etc.),
 then I'd simply write a normal spark application, and trigger it
 periodically. There are many things that can take care of that - sometimes
 a simple cronjob is enough!

 --
 Date: Sun, 5 Jul 2015 22:48:37 +1000
 Subject: Re: JDBC Streams
 From: guha.a...@gmail.com
 To: ak...@sigmoidanalytics.com
 CC: user@spark.apache.org


 Thanks Akhil. In case I go with spark streaming, I guess I have to
 implment a custom receiver and spark streaming will call this receiver
 every batch interval, is that correct? Any gotcha you see in this plan?
 TIA...Best, Ayan

 On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If you want a long running application, then go with spark streaming
 (which kind of blocks your resources). On the other hand, if you use job
 server then you can actually use the resources (CPUs) for other jobs also
 when your dbjob is not using them.

 Thanks
 Best Regards

 On Sun, Jul 5, 2015 at 5:28 AM, ayan guha guha.a...@gmail.com wrote:

 Hi All

Re: build spark 1.4.1 with JDK 1.6

2015-08-21 Thread Chen Song
Thanks Sean.

So how PySpark is supported. I thought PySpark needs jdk 1.6.

Chen

On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote:

 Spark 1.4 requires Java 7.

 On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote:

 I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support
 PySpark, I used JDK 1.6.

 I got the following error,

 [INFO] --- scala-maven-plugin:3.2.0:testCompile
 (scala-test-compile-first) @ spark-streaming_2.10 ---

 java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable
 : Unsupported major.minor version 51.0
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

 I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7.
 Anyone has done this before?

 Thanks,

 --
 Chen Song




-- 
Chen Song


build spark 1.4.1 with JDK 1.6

2015-08-21 Thread Chen Song
I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support
PySpark, I used JDK 1.6.

I got the following error,

[INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first)
@ spark-streaming_2.10 ---

java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable :
Unsupported major.minor version 51.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7.
Anyone has done this before?

Thanks,

-- 
Chen Song


NotSerializableException in spark 1.4.0

2015-07-15 Thread Chen Song
The streaming job has been running ok in 1.2 and 1.3. After I upgraded to
1.4, I started seeing error as below. It appears that it fails in validate
method in StreamingContext. Is there anything changed on 1.4.0 w.r.t
DStream checkpointint?

Detailed error from driver:

15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
exception: *java.io.NotSerializableException:
DStream checkpointing has been enabled but the DStreams with their
functions are not serializable*
Serialization stack:

java.io.NotSerializableException: DStream checkpointing has been enabled
but the DStreams with their functions are not serializable
Serialization stack:

at
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)

-- 
Chen Song


Re: NotSerializableException in spark 1.4.0

2015-07-15 Thread Chen Song
Ah, cool. Thanks.

On Wed, Jul 15, 2015 at 5:58 PM, Tathagata Das t...@databricks.com wrote:

 Spark 1.4.1 just got released! So just download that. Yay for timing.

 On Wed, Jul 15, 2015 at 2:47 PM, Ted Yu yuzhih...@gmail.com wrote:

 Should be this one:
 [SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number of
 SerializationDebugger bugs and limitations
 ...
 Closes #6625 from tdas/SPARK-7180 and squashes the following commits:

 On Wed, Jul 15, 2015 at 2:37 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks

 Can you point me to the patch to fix the serialization stack? Maybe I
 can pull it in and rerun my job.

 Chen

 On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Your streaming job may have been seemingly running ok, but the DStream
 checkpointing must have been failing in the background. It would have been
 visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so
 that checkpointing failures dont get hidden in the background.

 The fact that the serialization stack is not being shown correctly, is
 a known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the
 next couple of days. That should help you to narrow down the culprit
 preventing serialization.

 On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you show us your function(s) ?

 Thanks

 On Wed, Jul 15, 2015 at 12:46 PM, Chen Song chen.song...@gmail.com
 wrote:

 The streaming job has been running ok in 1.2 and 1.3. After I
 upgraded to 1.4, I started seeing error as below. It appears that it 
 fails
 in validate method in StreamingContext. Is there anything changed on 
 1.4.0
 w.r.t DStream checkpointint?

 Detailed error from driver:

 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
 exception: *java.io.NotSerializableException: DStream checkpointing
 has been enabled but the DStreams with their functions are not 
 serializable*
 Serialization stack:

 java.io.NotSerializableException: DStream checkpointing has been
 enabled but the DStreams with their functions are not serializable
 Serialization stack:

 at
 org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
 at
 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
 at
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)

 --
 Chen Song






 --
 Chen Song






-- 
Chen Song


Re: NotSerializableException in spark 1.4.0

2015-07-15 Thread Chen Song
Thanks

Can you point me to the patch to fix the serialization stack? Maybe I can
pull it in and rerun my job.

Chen

On Wed, Jul 15, 2015 at 4:40 PM, Tathagata Das t...@databricks.com wrote:

 Your streaming job may have been seemingly running ok, but the DStream
 checkpointing must have been failing in the background. It would have been
 visible in the log4j logs. In 1.4.0, we enabled fast-failure for that so
 that checkpointing failures dont get hidden in the background.

 The fact that the serialization stack is not being shown correctly, is a
 known bug in Spark 1.4.0, but is fixed in 1.4.1 about to come out in the
 next couple of days. That should help you to narrow down the culprit
 preventing serialization.

 On Wed, Jul 15, 2015 at 1:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you show us your function(s) ?

 Thanks

 On Wed, Jul 15, 2015 at 12:46 PM, Chen Song chen.song...@gmail.com
 wrote:

 The streaming job has been running ok in 1.2 and 1.3. After I upgraded
 to 1.4, I started seeing error as below. It appears that it fails in
 validate method in StreamingContext. Is there anything changed on 1.4.0
 w.r.t DStream checkpointint?

 Detailed error from driver:

 15/07/15 18:00:39 ERROR yarn.ApplicationMaster: User class threw
 exception: *java.io.NotSerializableException: DStream checkpointing has
 been enabled but the DStreams with their functions are not serializable*
 Serialization stack:

 java.io.NotSerializableException: DStream checkpointing has been enabled
 but the DStreams with their functions are not serializable
 Serialization stack:

 at
 org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
 at
 org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
 at
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)

 --
 Chen Song






-- 
Chen Song


Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
 spark streaming 1.2.

 If processing executors get crashed will receiver rest the
 offset back to last processed offset?

 If receiver itself got crashed is there a way to reset the
 offset without restarting streaming application other than smallest 
 or
 largest.


 Is spark streaming 1.3  which uses low level consumer api,
 stabe? And which is recommended for handling data  loss 1.2 or 1.3 .














 --
 Best Regards,
 Ayan Guha







-- 
Chen Song


Re: rest on streaming

2015-07-14 Thread Chen Song
Thanks TD, that is very useful.

On Tue, Jul 14, 2015 at 10:19 PM, Tathagata Das t...@databricks.com wrote:

 You can do this.

 // global variable to keep track of latest stuff
 var latestTime = _
 var latestRDD = _


 dstream.foreachRDD((rdd: RDD[..], time: Time) = {
 latestTime = time
 latestRDD = rdd
 })

 Now you can asynchronously access the latest RDD. However if you are going
 to run jobs on the latest RDD, you must tell the streaming subsystem to
 keep the necessary data around for longer, otherwise it will get deleted
 even before asynchronous query has completed. Use this.

 streamingContext.remember(expected max duration of your async query on
 latest RDD)


 On Tue, Jul 14, 2015 at 6:57 PM, Chen Song chen.song...@gmail.com wrote:

 I have been POC adding a rest service in a Spark Streaming job. Say I
 create a stateful DStream X by using updateStateByKey, and each time there
 is a HTTP request, I want to apply some transformations/actions on the
 latest RDD of X and collect the results immediately but not scheduled by
 streaming batch interval.

 * Is that even possible?
 * The reason I think of this is because user can get a list of RDDs by
 DStream.window.slice but I cannot find a way to get the most recent RDD in
 the DSteam.


 --
 Chen Song





-- 
Chen Song


Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
Thanks TD and Cody. I saw that.

1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets on
HDFS at the end of each batch interval?
2. In the code, if I first apply transformations and actions on the
directKafkaStream and then use foreachRDD on the original KafkaDStream to
commit offsets myself, will offsets commits always happen after
transformation and action?

Chen

On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das t...@databricks.com wrote:

 Relevant documentation -
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
 towards the end.

 directKafkaStream.foreachRDD { rdd =
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  }


 On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com
 wrote:

 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed to
 HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data
 consumption of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
 wrote:

 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use 1
 per receiver since consuming from kafka is not cpu bound rather 
 NIC(network
 bound)  increasing consumer thread on one receiver won't make it parallel
 in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
 topic has 300 partions , does kafkaRDD created on 5 executors will have 60
 partitions per executor (total 300 one to one mapping) and if some of 
 kafka
 partitions are empty say offset of last checkpoint to current is same for
 partitons P123, still it will create empty partition in kafkaRDD ? So we
 should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in 
 kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole 
 code
 is written in one function(main function in driver) so how it determined
 kafka streams receivers  not to be registered in each batch only 
 processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com
 wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody and
 TD will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread from
 Akhil D:
 Since you set local[4] you will have 4 threads for your computation,
 and since you are having 2 receivers, you are left with 2 threads
 to process ((0 + 2) -- This 2 is your 2 threads.) And the other /2
 means you are having 2 tasks in that stage (with id 0).

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.
 2. How does that matter? It depends on how many receivers you have
 created to consume that data and if you have repartitioned it. Remember,
 spark is lazy and executors are relted to the context
 3. I think in java, factory method is fixed. You just pass around
 the contextFactory object. (I love python :) see the signature isso much
 cleaner :) )
 4. Yes, if you use spark checkpointing. You can use yourcustom check
 pointing too.

 Best
 Ayan



 On Mon, Jun 29, 2015 at 4:02 AM

Re: spark streaming with kafka reset offset

2015-07-14 Thread Chen Song
Thanks TD.

As for 1), if timing is not guaranteed, how does exactly once semantics
supported? It feels like exactly once receiving is not necessarily exactly
once processing.

Chen

On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das t...@databricks.com wrote:



 On Tue, Jul 14, 2015 at 6:42 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks TD and Cody. I saw that.

 1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
 on HDFS at the end of each batch interval?


 The timing is not guaranteed.


 2. In the code, if I first apply transformations and actions on the
 directKafkaStream and then use foreachRDD on the original KafkaDStream to
 commit offsets myself, will offsets commits always happen after
 transformation and action?

 What do you mean by original KafkaDStream? if you meant the
 directKafkaStream? If yes, then yes, output operations like foreachRDD is
 executed in each batch in the same order as they are defined.

 dstream1.foreachRDD { rdd = func1(rdd) }
 dstream2.foreachRDD { rdd = func2(rdd) }

 In every batch interval, func1 will be executed before func2.




 Chen

 On Tue, Jul 14, 2015 at 6:43 PM, Tathagata Das t...@databricks.com
 wrote:

 Relevant documentation -
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
 towards the end.

 directKafkaStream.foreachRDD { rdd =
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  // offsetRanges.length = # of Kafka partitions being consumed
  ...
  }


 On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You have access to the offset ranges for a given rdd in the stream by
 typecasting to HasOffsetRanges.  You can then store the offsets wherever
 you need to.

 On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com
 wrote:

 A follow up question.

 When using createDirectStream approach, the offsets are checkpointed
 to HDFS and it is understandable by Spark Streaming job. Is there a way to
 expose the offsets via a REST api to end users. Or alternatively, is there
 a way to have offsets committed to Kafka Offset Manager so users can query
 from a consumer programmatically?

 Essentially, all I need to do is monitor the progress of data
 consumption of the Kafka topic.


 On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger c...@koeninger.org
 wrote:

 You can't use different versions of spark in your application vs your
 cluster.

 For the direct stream, it's not 60 partitions per executor, it's 300
 partitions, and executors work on them as they are scheduled.  Yes, if 
 you
 have no messages you will get an empty partition.  It's up to you whether
 it's worthwhile to call coalesce or not.

 On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is this 3 is no of parallel consumer threads per receiver , means in
 total we have 2*3=6 consumer in same consumer group consuming from all 
 300
 partitions.
 3 is just parallelism on same receiver and recommendation is to use
 1 per receiver since consuming from kafka is not cpu bound rather
 NIC(network bound)  increasing consumer thread on one receiver won't 
 make
 it parallel in ideal sense ?

 In non receiver based consumer spark 1.3 If I use 5 execuots and
 kafka topic has 300 partions , does kafkaRDD created on 5 executors will
 have 60 partitions per executor (total 300 one to one mapping) and if 
 some
 of kafka partitions are empty say offset of last checkpoint to current 
 is
 same for partitons P123, still it will create empty partition in 
 kafkaRDD ?
 So we should call coalesce on kafkaRDD ?


 And is there any incompatibity issue when I include
 spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in 
 my
 application but my cluster has spark version 1.2 ?






 On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 1. Here you are basically creating 2 receivers and asking each of
 them to consume 3 kafka partitions each.

 - In 1.2 we have high level consumers so how can we restrict no of
 kafka partitions to consume from? Say I have 300 kafka partitions in 
 kafka
 topic and as in above I gave 2 receivers and 3 kafka partitions . Then 
 is
 it mean I will read from 6 out of 300 partitions only and for rest 294
 partitions data is lost?


 2.One more doubt in spark streaming how is it decided which part of
 main function of driver will run at each batch interval ? Since whole 
 code
 is written in one function(main function in driver) so how it 
 determined
 kafka streams receivers  not to be registered in each batch only 
 processing
 to be done .






 On Mon, Jun 29, 2015 at 7:35 PM, ayan guha guha.a...@gmail.com
 wrote:

 Hi

 Let me take ashot at your questions. (I am sure people like Cody
 and TD will correct if I am wrong)

 0. This is exact copy from the similar question in mail thread
 from Akhil D:
 Since you set local[4] you will have 4 threads for your
 computation, and since you are having 2 receivers, you

rest on streaming

2015-07-14 Thread Chen Song
I have been POC adding a rest service in a Spark Streaming job. Say I
create a stateful DStream X by using updateStateByKey, and each time there
is a HTTP request, I want to apply some transformations/actions on the
latest RDD of X and collect the results immediately but not scheduled by
streaming batch interval.

* Is that even possible?
* The reason I think of this is because user can get a list of RDDs by
DStream.window.slice but I cannot find a way to get the most recent RDD in
the DSteam.


-- 
Chen Song


Re: Spark serialization in closure

2015-07-09 Thread Chen Song
Thanks Andrew.

I tried with your suggestions and (2) works for me. (1) still doesn't work.

Chen

On Thu, Jul 9, 2015 at 4:58 PM, Andrew Or and...@databricks.com wrote:

 Hi Chen,

 I believe the issue is that `object foo` is a member of `object testing`,
 so the only way to access `object foo` is to first pull `object testing`
 into the closure, then access a pointer to get to `object foo`. There are
 two workarounds that I'm aware of:

 (1) Move `object foo` outside of `object testing`. This is only a problem
 because of the nested objects. Also, by design it's simpler to reason about
 but that's a separate discussion.

 (2) Create a local variable for `foo.v`. If all your closure cares about
 is the integer, then it makes sense to add a `val v = foo.v` inside `func`
 and use this in your closure instead. This avoids pulling in $outer
 pointers into your closure at all since it only references local variables.

 As others have commented, I think this is more of a Scala problem than a
 Spark one.

 Let me know if these work,
 -Andrew

 2015-07-09 13:36 GMT-07:00 Richard Marscher rmarsc...@localytics.com:

 Reading that article and applying it to your observations of what happens
 at runtime:

 shouldn't the closure require serializing testing? The foo singleton
 object is a member of testing, and then you call this foo value in the
 closure func and further in the foreachPartition closure. So following by
 that article, Scala will attempt to serialize the containing object/class
 testing to get the foo instance.

 On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote:

 Repost the code example,

 object testing extends Serializable {
 object foo {
   val v = 42
 }
 val list = List(1,2,3)
 val rdd = sc.parallelize(list)
 def func = {
   val after = rdd.foreachPartition {
 it = println(foo.v)
   }
 }
   }

 On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Erik. I saw the document too. That is why I am confused because
 as per the article, it should be good as long as *foo *is
 serializable. However, what I have seen is that it would work if
 *testing* is serializable, even foo is not serializable, as shown
 below. I don't know if there is something specific to Spark.

 For example, the code example below works.

 object testing extends Serializable {

 object foo {

   val v = 42

 }

 val list = List(1,2,3)

 val rdd = sc.parallelize(list)

 def func = {

   val after = rdd.foreachPartition {

 it = println(foo.v)

   }

 }

   }

 On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but
 I am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing
 the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




 --
 Chen Song




 --
 Chen Song




 --
 --
 *Richard Marscher*
 Software Engineer
 Localytics
 Localytics.com http://localytics.com/ | Our Blog
 http://localytics.com/blog | Twitter http://twitter.com/localytics |
 Facebook http://facebook.com/localytics | LinkedIn
 http://www.linkedin.com/company/1148792?trk=tyah





-- 
Chen Song


Re: Spark serialization in closure

2015-07-09 Thread Chen Song
Thanks Erik. I saw the document too. That is why I am confused because as
per the article, it should be good as long as *foo *is serializable.
However, what I have seen is that it would work if *testing* is
serializable, even foo is not serializable, as shown below. I don't know if
there is something specific to Spark.

For example, the code example below works.

object testing extends Serializable {

object foo {

  val v = 42

}

val list = List(1,2,3)

val rdd = sc.parallelize(list)

def func = {

  val after = rdd.foreachPartition {

it = println(foo.v)

  }

}

  }

On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but I am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




-- 
Chen Song


Re: Spark serialization in closure

2015-07-09 Thread Chen Song
Repost the code example,

object testing extends Serializable {
object foo {
  val v = 42
}
val list = List(1,2,3)
val rdd = sc.parallelize(list)
def func = {
  val after = rdd.foreachPartition {
it = println(foo.v)
  }
}
  }

On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Erik. I saw the document too. That is why I am confused because as
 per the article, it should be good as long as *foo *is serializable.
 However, what I have seen is that it would work if *testing* is
 serializable, even foo is not serializable, as shown below. I don't know if
 there is something specific to Spark.

 For example, the code example below works.

 object testing extends Serializable {

 object foo {

   val v = 42

 }

 val list = List(1,2,3)

 val rdd = sc.parallelize(list)

 def func = {

   val after = rdd.foreachPartition {

 it = println(foo.v)

   }

 }

   }

 On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote:

 I think you have stumbled across this idiosyncrasy:


 http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/




 - Original Message -
  I am not sure this is more of a question for Spark or just Scala but I
 am
  posting my question here.
 
  The code snippet below shows an example of passing a reference to a
 closure
  in rdd.foreachPartition method.
 
  ```
  object testing {
  object foo extends Serializable {
val v = 42
  }
  val list = List(1,2,3)
  val rdd = sc.parallelize(list)
  def func = {
val after = rdd.foreachPartition {
  it = println(foo.v)
}
  }
}
  ```
  When running this code, I got an exception
 
  ```
  Caused by: java.io.NotSerializableException:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
  Serialization stack:
  - object not serializable (class:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
  $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
  - field (class:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  name: $outer, type: class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
  - object (class
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
  function1)
  ```
 
  It looks like Spark needs to serialize `testing` object. Why is it
  serializing testing even though I only pass foo (another serializable
  object) in the closure?
 
  A more general question is, how can I prevent Spark from serializing the
  parent class where RDD is defined, with still support of passing in
  function defined in other classes?
 
  --
  Chen Song
 




 --
 Chen Song




-- 
Chen Song


Spark serialization in closure

2015-07-09 Thread Chen Song
I am not sure this is more of a question for Spark or just Scala but I am
posting my question here.

The code snippet below shows an example of passing a reference to a closure
in rdd.foreachPartition method.

```
object testing {
object foo extends Serializable {
  val v = 42
}
val list = List(1,2,3)
val rdd = sc.parallelize(list)
def func = {
  val after = rdd.foreachPartition {
it = println(foo.v)
  }
}
  }
```
When running this code, I got an exception

```
Caused by: java.io.NotSerializableException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
Serialization stack:
- object not serializable (class:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
- object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
function1)
```

It looks like Spark needs to serialize `testing` object. Why is it
serializing testing even though I only pass foo (another serializable
object) in the closure?

A more general question is, how can I prevent Spark from serializing the
parent class where RDD is defined, with still support of passing in
function defined in other classes?

-- 
Chen Song


(de)serialize DStream

2015-07-07 Thread Chen Song
In Spark Streaming, when using updateStateByKey, it requires the generated
DStream to be checkpointed.

It seems that it always use JavaSerializer, no matter what I set for
spark.serializer. Can I use KryoSerializer for checkpointing? If not, I
assume the key and value types have to be Serializable?

Chen


Re: Spark1.3.1 build issue with CDH5.4.0 getUnknownFields

2015-05-29 Thread Chen Song
Regarding the build itself, hadoop-2.6 is not even a valid profile.

I got the following WARNING for my build.

[WARNING] The requested profile hadoop-2.6 could not be activated because
it does not exist.

Chen

On Fri, May 29, 2015 at 2:38 AM, trackissue121 trackissue...@gmail.com
wrote:

 I had already tested query in Hive CLI and it works fine. Same query shows
 error in Spark SQL.
 On May 29, 2015 4:14 AM, ayan guha guha.a...@gmail.com wrote:

 Probably a naive question: can you try the same in hive CLI and see if
 your SQL is working? Looks like hive thing to me as spark is faithfully
 delegating the query to hive.
 On 29 May 2015 03:22, Abhishek Tripathi trackissue...@gmail.com wrote:

 Hi ,
 I'm using CDH5.4.0  quick start VM and tried to build Spark with Hive
 compatibility so that I can run Spark sql and access temp table remotely.

 I used below command to build  Spark, it was build successful but when I
 tried to access Hive data from Spark sql, I get error.

 Thanks,
 Abhi

 ---
 *mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.4.0 -Phive
 -Phive-thriftserver -DskipTests clean package*

 [cloudera@quickstart spark-1.3.1]$export HADOOP_CONF_DIR=/etc/hive/conf/
 [cloudera@quickstart spark-1.3.1]$ ./bin/spark-sql
 SET spark.sql.hive.version=0.13.1
 spark-sql show tables;
 sample_07 false
 t1 false
 Time taken: 3.901 seconds, Fetched 2 row(s)
 spark-sql select * from t1;
 15/05/19 23:48:46 ERROR SparkSQLDriver: Failed in [select * from t1]
 java.lang.VerifyError: class
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$SetOwnerRequestProto
 overrides final method *getUnknownFields*
 .()Lcom/google/protobuf/UnknownFieldSet;
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)






-- 
Chen Song


spark streaming driver hang

2015-03-27 Thread Chen Song
I ran a spark streaming job.

100 executors
30G heap per executor
4 cores per executor

The version I used is 1.3.0-cdh5.1.0.

The job is reading from a directory on HDFS (with files incoming
continuously) and does some join on the data. I set batch interval to be 15
minutes and the job worked fine in the first few batches.

However, it just stalled after 7-8 batches. Below are some symptoms.

* In Spark UI, every tab worked fine except Streaming tab. When I clicked
on it, it just hang forever.
* I did not see any GC activity on driver.
* Nothing was printed out from driver log.

Anyone has seen this before?

-- 
Chen Song


Re: shuffle write size

2015-03-26 Thread Chen Song
Anyone can shed some light on this?

On Tue, Mar 17, 2015 at 5:23 PM, Chen Song chen.song...@gmail.com wrote:

 I have a map reduce job that reads from three logs and joins them on some
 key column. The underlying data is protobuf messages in sequence
 files. Between mappers and reducers, the underlying raw byte arrays for
 protobuf messages are shuffled . Roughly, for 1G input from HDFS, there is
 2G data output from map phase.

 I am testing spark jobs (v1.3.0) on the same input. I found that shuffle
 write is 3 - 4 times input size. I tried passing protobuf Message object
 and ArrayByte but neither gives good shuffle write output.

 Is there any good practice on shuffling

 * protobuf messages
 * raw byte array

 Chen




-- 
Chen Song


FetchFailedException during shuffle

2015-03-26 Thread Chen Song
Using spark 1.3.0 on cdh5.1.0, I was running a fetch failed exception.

I searched in this email list but not found anything like this reported.
What could be the reason for the error?

org.apache.spark.shuffle.FetchFailedException: [EMPTY_INPUT] Cannot
decompress empty stream
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 

shuffle write size

2015-03-17 Thread Chen Song
I have a map reduce job that reads from three logs and joins them on some
key column. The underlying data is protobuf messages in sequence
files. Between mappers and reducers, the underlying raw byte arrays for
protobuf messages are shuffled . Roughly, for 1G input from HDFS, there is
2G data output from map phase.

I am testing spark jobs (v1.3.0) on the same input. I found that shuffle
write is 3 - 4 times input size. I tried passing protobuf Message object
and ArrayByte but neither gives good shuffle write output.

Is there any good practice on shuffling

* protobuf messages
* raw byte array

Chen


org.apache.spark.SparkException Error sending message

2015-03-13 Thread Chen Song
When I ran Spark SQL query (a simple group by query) via hive support, I
have seen lots of failures in map phase.

I am not sure if that is specific to Spark SQL or general.

Any one has seen such errors before?

java.io.IOException: org.apache.spark.SparkException: Error sending message
[message = GetLocations(broadcast_9_piece0)]

java.io.IOException: org.apache.spark.SparkException: Error sending
message [message = GetLocations(broadcast_9_piece0)]
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Error sending message
[message = GetLocations(broadcast_9_piece0)]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
at 
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
at 
org.apache.spark.storage.BlockManagerMaster.getLocations(BlockManagerMaster.scala:70)
at 
org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:592)
at 
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:587)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
... 12 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out
after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
... 27 more


-- 
Chen Song


Spark Streaming action not triggered with Kafka inputs

2015-01-23 Thread Chen Song
I am running into some problems with Spark Streaming when reading from
Kafka.I used Spark 1.2.0 built on CDH5.
The example is based on:
https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
* It works with default implementation.
val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

* However, when I changed it to parallel receiving, like shown below

val topicMap = topics.split(,).map((_, 1)).toMap
val parallelInputs = (1 to numThreads.toInt) map { _ = KafkaUtils.
createStream(ssc, zkQuorum, group, topicMap)

}

ssc.union(parallelInputs)
After the change, the job stage just hang there and never finish. It looks
like no action is triggered on the streaming job. When I check the
Streaming tab, it show messages below:
Batch Processing Statistics

   No statistics have been generated yet.


Am I doing anything wrong on the parallel receiving part?

-- 
Chen Song


Re: serialize protobuf messages

2014-12-30 Thread Chen Song
Anyone has suggestions?

On Tue, Dec 23, 2014 at 3:08 PM, Chen Song chen.song...@gmail.com wrote:

 Silly question, what is the best way to shuffle protobuf messages in Spark
 (Streaming) job? Can I use Kryo on top of protobuf Message type?

 --
 Chen Song




-- 
Chen Song


serialize protobuf messages

2014-12-23 Thread Chen Song
Silly question, what is the best way to shuffle protobuf messages in Spark
(Streaming) job? Can I use Kryo on top of protobuf Message type?

-- 
Chen Song


Re: Shuffle files

2014-10-20 Thread Chen Song
My observation is opposite. When my job runs under default
spark.shuffle.manager, I don't see this exception. However, when it runs
with SORT based, I start seeing this error? How would that be possible?

I am running my job in YARN, and I noticed that the YARN process limits
(cat /proc/$PID/limits) are not consistent with system wide limits (shown
by limit -a), I don't know how that happened. Is there a way to let Spark
driver to propagate this setting (limit -n number) to spark executors
before startup?




On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash and...@andrewash.com wrote:

 You will need to restart your Mesos workers to pick up the new limits as
 well.

 On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri sunny.k...@gmail.com wrote:

 @SK:
 Make sure ulimit has taken effect as Todd mentioned. You can verify via
 ulimit -a. Also make sure you have proper kernel parameters set in
 /etc/sysctl.conf (MacOSX)

 On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd todd.lison...@intel.com
 wrote:


 Are you sure the new ulimit has taken effect?

 How many cores are you using?  How many reducers?

 In general if a node in your cluster has C assigned cores and
 you run
 a job with X reducers then Spark will open C*X files in parallel
 and
 start writing. Shuffle consolidation will help decrease the total
 number of files created but the number of file handles open at
 any
 time doesn't change so it won't help the ulimit problem.

 Quoted from Patrick at:

 http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

 Thanks,

 Todd

 -Original Message-
 From: SK [mailto:skrishna...@gmail.com]
 Sent: Tuesday, October 7, 2014 2:12 PM
 To: u...@spark.incubator.apache.org
 Subject: Re: Shuffle files

 - We set ulimit to 50. But I still get the same too many open files
 warning.

 - I tried setting consolidateFiles to True, but that did not help either.

 I am using a Mesos cluster.   Does Mesos have any limit on the number of
 open files?

 thanks






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






-- 
Chen Song


Re: Does SparkSQL work with custom defined SerDe?

2014-10-14 Thread Chen Song
Looks like it may be related to
https://issues.apache.org/jira/browse/SPARK-3807.

I will build from branch 1.1 to see if the issue is resolved.

Chen

On Tue, Oct 14, 2014 at 10:33 AM, Chen Song chen.song...@gmail.com wrote:

 Sorry for bringing this out again, as I have no clue what could have
 caused this.

 I turned on DEBUG logging and did see the jar containing the SerDe class
 was scanned.

 More interestingly, I saw the same exception
 (org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
 attributes) when running simple select on valid column names and malformed
 column names. This lead me to suspect that syntactical breaks somewhere.

 select [valid_column] from table limit 5;
 select [malformed_typo_column] from table limit 5;


 On Mon, Oct 13, 2014 at 6:04 PM, Chen Song chen.song...@gmail.com wrote:

 In Hive, the table was created with custom SerDe, in the following way.

 row format serde abc.ProtobufSerDe

 with serdeproperties (serialization.class=
 abc.protobuf.generated.LogA$log_a)

 When I start spark-sql shell, I always got the following exception, even
 for a simple query.

 select user from log_a limit 25;

 I can desc the table without any problem. When I explain the query, I got
 the same exception.


 14/10/13 22:01:13 INFO impl.AMRMClientImpl: Waiting for application to be
 successfully unregistered.

 Exception in thread Driver java.lang.reflect.InvocationTargetException

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162)

 Caused by:
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
 attributes: 'user, tree:

 Project ['user]

  Filter (dh#4 = 2014-10-13 05)

   LowerCaseSchema

MetastoreRelation test, log_a, None


 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72)

 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70)

 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)

 at
 org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)

 at scala.collection.AbstractIterator.to(Iterator.scala:1157)

 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)

 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)

 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)

 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)

 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)

 at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)

 at
 scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)

 at
 scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)

 at scala.collection.immutable.List.foreach(List.scala:318)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)

 at
 org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute

Does SparkSQL work with custom defined SerDe?

2014-10-13 Thread Chen Song


-- 
Chen Song


Re: Spark SQL and Hive tables

2014-09-30 Thread Chen Song
I have ran into the same issue. I understand with the new assembly built
with -Phive, I can run a spark job in yarn-cluster mode. But is there a way
for me to run spark-shell with support of hive?

I tried to add the new assembly jar with --driver-library-path
and --driver-class-path but neither works. I kept seeing the same exception.

object hive is not a member of package org.apache.spark.sql
   val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

On Fri, Jul 25, 2014 at 6:25 PM, sstilak ssti...@live.com wrote:

  Thanks!  Will do.


  Sent via the Samsung GALAXY S®4, an ATT 4G LTE smartphone


  Original message 
 From: Michael Armbrust
 Date:07/25/2014 3:24 PM (GMT-08:00)
 To: user@spark.apache.org
 Subject: Re: Spark SQL and Hive tables

   [S]ince Hive has a large number of dependencies, it is not included in
 the default Spark assembly. In order to use Hive you must first run 
 ‘SPARK_HIVE=true
 sbt/sbt assembly/assembly’ (or use -Phive for maven). This command builds
 a new assembly jar that includes Hive. Note that this Hive assembly jar
 must also be present on all of the worker nodes, as they will need access
 to the Hive serialization and deserialization libraries (SerDes) in order
 to acccess data stored in Hive.



 On Fri, Jul 25, 2014 at 3:20 PM, Sameer Tilak ssti...@live.com wrote:

  Hi Jerry,

  I am having trouble with this. May be something wrong with my import or
 version etc.

  scala import org.apache.spark.sql._;
 import org.apache.spark.sql._

  scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 console:24: error: object hive is not a member of package
 org.apache.spark.sql
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
   ^
 Here is what I see for autocompletion:

  scala org.apache.spark.sql.
 Row SQLContext  SchemaRDD   SchemaRDDLike   api
 catalystcolumnarexecution   package parquet
 test


  --
 Date: Fri, 25 Jul 2014 17:48:27 -0400

 Subject: Re: Spark SQL and Hive tables
  From: chiling...@gmail.com
 To: user@spark.apache.org


 Hi Sameer,

  The blog post you referred to is about Spark SQL. I don't think the
 intent of the article is meant to guide you how to read data from Hive via
 Spark SQL. So don't worry too much about the blog post.

  The programming guide I referred to demonstrate how to read data from
 Hive using Spark SQL. It is a good starting point.

  Best Regards,

  Jerry


 On Fri, Jul 25, 2014 at 5:38 PM, Sameer Tilak ssti...@live.com wrote:

  Hi Michael,
 Thanks. I am not creating HiveContext, I am creating SQLContext. I am
 using CDH 5.1. Can you please let me know which conf/ directory you are
 talking about?

  --
 From: mich...@databricks.com
 Date: Fri, 25 Jul 2014 14:34:53 -0700

 Subject: Re: Spark SQL and Hive tables
  To: user@spark.apache.org


 In particular, have you put your hive-site.xml in the conf/ directory?
 Also, are you creating a HiveContext instead of a SQLContext?


 On Fri, Jul 25, 2014 at 2:27 PM, Jerry Lam chiling...@gmail.com wrote:

 Hi Sameer,

  Maybe this page will help you:
 https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

  Best Regards,

  Jerry



 On Fri, Jul 25, 2014 at 5:25 PM, Sameer Tilak ssti...@live.com wrote:

  Hi All,
 I am trying to load data from Hive tables using Spark SQL. I am using
 spark-shell. Here is what I see:

  val trainingDataTable = sql(SELECT prod.prod_num,
 demographics.gender, demographics.birth_year, demographics.income_group
  FROM prod p JOIN demographics d ON d.user_id = p.user_id)

  14/07/25 14:18:46 INFO Analyzer: Max iterations (2) reached for batch
 MultiInstanceRelations
 14/07/25 14:18:46 INFO Analyzer: Max iterations (2) reached for batch
 CaseInsensitiveAttributeReferences
 java.lang.RuntimeException: Table Not Found: prod.

  I have these tables in hive. I used show tables command to confirm this.
 Can someone please let me know how do I make them accessible here?








-- 
Chen Song


Re: spark time out

2014-09-23 Thread Chen Song
I am running the job on 500 executors, each with 8G and 1 core.

See lots of fetch failures on reduce stage, when running a simple
reduceByKey

map tasks - 4000
reduce tasks - 200



On Mon, Sep 22, 2014 at 12:22 PM, Chen Song chen.song...@gmail.com wrote:

 I am using Spark 1.1.0 and have seen a lot of Fetch Failures due to the
 following exception.

 java.io.IOException: sendMessageReliably failed because ack was not
 received within 60 sec
 at
 org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:854)
 at
 org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:852)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:852)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)

 I have increased spark.core.connection.ack.wait.timeout to 120 seconds.
 Situation is relieved but not too much. I am pretty confident it was not
 due to GC on executors. What could be the reason for this?

 Chen




-- 
Chen Song


spark time out

2014-09-22 Thread Chen Song
I am using Spark 1.1.0 and have seen a lot of Fetch Failures due to the
following exception.

java.io.IOException: sendMessageReliably failed because ack was not
received within 60 sec
at
org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:854)
at
org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:852)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:852)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

I have increased spark.core.connection.ack.wait.timeout to 120 seconds.
Situation is relieved but not too much. I am pretty confident it was not
due to GC on executors. What could be the reason for this?

Chen


Re: saveAsTextFiles file not found exception

2014-08-12 Thread Chen Song
Thanks for putting this together, Andrew.


On Tue, Aug 12, 2014 at 2:11 AM, Andrew Ash and...@andrewash.com wrote:

 Hi Chen,

 Please see the bug I filed at
 https://issues.apache.org/jira/browse/SPARK-2984 with the
 FileNotFoundException on _temporary directory issue.

 Andrew


 On Mon, Aug 11, 2014 at 10:50 PM, Andrew Ash and...@andrewash.com wrote:

 Not sure which stalled HDFS client issue your'e referring to, but there
 was one fixed in Spark 1.0.2 that could help you out --
 https://github.com/apache/spark/pull/1409.  I've still seen one related
 to Configuration objects not being threadsafe though so you'd still need to
 keep speculation on to fix that (SPARK-2546)

 As it stands now, I can:

 A) have speculation off, in which case I get random hangs for a variety
 of reasons (your HDFS stall, my Configuration safety issue)

 or

 B) have speculation on, in which case I get random failures related to
 LeaseExpiredExceptions and .../_temporary/... file doesn't exist exceptions.


 Kind of a catch-22 -- there's no reliable way to run large jobs on Spark
 right now!

 I'm going to file a bug for the _temporary and LeaseExpiredExceptions as
 I think these are widespread enough that we need a place to track a
 resolution.


 On Mon, Aug 11, 2014 at 9:08 AM, Chen Song chen.song...@gmail.com
 wrote:

 Andrew that is a good finding.

 Yes, I have speculative execution turned on, becauseI saw tasks stalled
 on HDFS client.

 If I turned off speculative execution, is there a way to circumvent the
 hanging task issue?



 On Mon, Aug 11, 2014 at 11:13 AM, Andrew Ash and...@andrewash.com
 wrote:

 I've also been seeing similar stacktraces on Spark core (not streaming)
 and have a theory it's related to spark.speculation being turned on.  Do
 you have that enabled by chance?


 On Mon, Aug 11, 2014 at 8:10 AM, Chen Song chen.song...@gmail.com
 wrote:

 Bill

 Did you get this resolved somehow? Anyone has any insight into this
 problem?

 Chen


 On Mon, Aug 11, 2014 at 10:30 AM, Chen Song chen.song...@gmail.com
 wrote:

 The exception was thrown out in application master(spark streaming
 driver) and the job shut down after this exception.


 On Mon, Aug 11, 2014 at 10:29 AM, Chen Song chen.song...@gmail.com
 wrote:

 I got the same exception after the streaming job runs for a while,
 The ERROR message was complaining about a temp file not being found in 
 the
 output folder.

 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming
 job 140774430 ms.0
 java.io.FileNotFoundException: File
 hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
 does not exist.
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at
 org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
 at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run

Re: saveAsTextFiles file not found exception

2014-08-11 Thread Chen Song
)
   at 
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
   at 
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
   at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
   at 
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:361)
   at 
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1439)
   at 
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1261)
   at 
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:525)
 14/07/25 14:45:12 WARN CheckpointWriter: Error in attempt 1 of writing 
 checkpoint to 
 hdfs://gnosis-01-01-01.crl.samsung.com/apps/data/vddil/real-time/checkpoint/checkpoint-140632470



 All my jobs use the same parameter to the function checkpoint. Is it the
 reason for the error?

 I will post the stack trace of the other error after it appears again.
 Thanks!


 Bill


 On Fri, Jul 25, 2014 at 2:57 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you give a stack trace and logs of the exception? Its hard to say
 anything without any associated stack trace and logs.

 TD


 On Fri, Jul 25, 2014 at 1:32 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi,

 I am running a Spark Streaming job that uses saveAsTextFiles to save
 results into hdfs files. However, it has an exception after 20 batches


 result-140631234/_temporary/0/task_201407251119__m_03 does not 
 exist.


 When the job is running, I do not change any file in the folder. Does
 anyone know why the file cannot be found?

 Thanks!

 Bill






-- 
Chen Song


Re: saveAsTextFiles file not found exception

2014-08-11 Thread Chen Song
The exception was thrown out in application master(spark streaming driver)
and the job shut down after this exception.


On Mon, Aug 11, 2014 at 10:29 AM, Chen Song chen.song...@gmail.com wrote:

 I got the same exception after the streaming job runs for a while, The
 ERROR message was complaining about a temp file not being found in the
 output folder.

 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job
 140774430 ms.0
 java.io.FileNotFoundException: File
 hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
 does not exist.
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at
 org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
 at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)


 On Fri, Jul 25, 2014 at 7:04 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I just saw another error after my job was run for 2 hours:

 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
  No lease on /apps/data/vddil/real-time/checkpoint/temp: File does not 
 exist. Holder DFSClient_NONMAPREDUCE_327993456_13 does not have any open 
 files.
  at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2946)
  at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:2766)
  at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2674)
  at 
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:584)
  at 
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:440)
  at 
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  at 
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:415)
  at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)

  at org.apache.hadoop.ipc.Client.call(Client.java:1410)
  at org.apache.hadoop.ipc.Client.call(Client.java:1363)
  at 
 org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
  at com.sun.proxy.$Proxy14.addBlock(Unknown Source

Re: saveAsTextFiles file not found exception

2014-08-11 Thread Chen Song
Bill

Did you get this resolved somehow? Anyone has any insight into this problem?

Chen


On Mon, Aug 11, 2014 at 10:30 AM, Chen Song chen.song...@gmail.com wrote:

 The exception was thrown out in application master(spark streaming driver)
 and the job shut down after this exception.


 On Mon, Aug 11, 2014 at 10:29 AM, Chen Song chen.song...@gmail.com
 wrote:

 I got the same exception after the streaming job runs for a while, The
 ERROR message was complaining about a temp file not being found in the
 output folder.

 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job
 140774430 ms.0
 java.io.FileNotFoundException: File
 hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
 does not exist.
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at
 org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
 at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)


 On Fri, Jul 25, 2014 at 7:04 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I just saw another error after my job was run for 2 hours:

 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
  No lease on /apps/data/vddil/real-time/checkpoint/temp: File does not 
 exist. Holder DFSClient_NONMAPREDUCE_327993456_13 does not have any open 
 files.
 at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2946)
 at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:2766)
 at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2674)
 at 
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:584)
 at 
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:440)
 at 
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at 
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)

 at org.apache.hadoop.ipc.Client.call(Client.java:1410)
 at org.apache.hadoop.ipc.Client.call(Client.java:1363

Re: saveAsTextFiles file not found exception

2014-08-11 Thread Chen Song
Andrew that is a good finding.

Yes, I have speculative execution turned on, becauseI saw tasks stalled on
HDFS client.

If I turned off speculative execution, is there a way to circumvent the
hanging task issue?



On Mon, Aug 11, 2014 at 11:13 AM, Andrew Ash and...@andrewash.com wrote:

 I've also been seeing similar stacktraces on Spark core (not streaming)
 and have a theory it's related to spark.speculation being turned on.  Do
 you have that enabled by chance?


 On Mon, Aug 11, 2014 at 8:10 AM, Chen Song chen.song...@gmail.com wrote:

 Bill

 Did you get this resolved somehow? Anyone has any insight into this
 problem?

 Chen


 On Mon, Aug 11, 2014 at 10:30 AM, Chen Song chen.song...@gmail.com
 wrote:

 The exception was thrown out in application master(spark streaming
 driver) and the job shut down after this exception.


 On Mon, Aug 11, 2014 at 10:29 AM, Chen Song chen.song...@gmail.com
 wrote:

 I got the same exception after the streaming job runs for a while, The
 ERROR message was complaining about a temp file not being found in the
 output folder.

 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job
 140774430 ms.0
 java.io.FileNotFoundException: File
 hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
 does not exist.
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at
 org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
 at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)


 On Fri, Jul 25, 2014 at 7:04 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I just saw another error after my job was run for 2 hours:


 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
  No lease on /apps/data/vddil/real-time/checkpoint/temp: File does not 
 exist. Holder DFSClient_NONMAPREDUCE_327993456_13 does not have any open 
 files.
   at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2946)
   at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:2766)
   at 
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2674)
   at 
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:584)
   at 
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:440)
   at 
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
   at 
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
   at org.apache.hadoop.ipc.Server$Handler$1.run

increase parallelism of reading from hdfs

2014-08-08 Thread Chen Song
In Spark Streaming, StreamContext.fileStream gives a FileInputDStream.
Within each batch interval, it would launch map tasks for the new files
detected during that interval. It appears that the way Spark compute the
number of map tasks is based oo block size of files.

Below is the quote from Spark documentation.

 Spark automatically sets the number of “map” tasks to run on each file
according to its size (though you can control it through optional
parameters to SparkContext.textFile, etc)

In my testing, if files are loaded as 512M blocks, each map task seems to
process 512M chunk of data, no matter what value I set dfs.blocksize on
driver/executor. I am wondering if there is a way to increase parallelism,
say let each map read 128M data and increase the number of map tasks?


-- 
Chen Song


spark streaming multiple file output paths

2014-08-07 Thread Chen Song
In Spark Streaming, is there a way to write output to different paths based
on the partition key? The saveAsTextFiles method will write output in the
same directory.

For example, if the partition key has a hour/day column and I want to
separate DStream output into different directories by hour/day.

-- 
Chen Song


Re: how to pass extra Java opts to workers for spark streaming jobs

2014-07-18 Thread Chen Song
Thanks Andrew, I tried and it works.


On Fri, Jul 18, 2014 at 12:53 AM, Andrew Or and...@databricks.com wrote:

 You will need to include that in the SPARK_JAVA_OPTS environment variable,
 so add the following line to spark-env.sh:

 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC

 This should propagate to the executors. (Though you should double check,
 since 0.9 is a little old and I could be forgetting something) If you wish
 to add spark options in addition to this, simply append them to the
 environment variable:

 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC -Dspark.config.one=value
 -Dspark.config.two=value

 (Please note that this is only for Spark 0.9. The part where we set Spark
 options within SPARK_JAVA_OPTS is deprecated as of 1.0)


 2014-07-17 21:08 GMT-07:00 Chen Song chen.song...@gmail.com:

 Thanks Andrew.

 Say that I want to turn on CMS gc for each worker.

 All I need to do is add the following line to conf/spark-env.sh on node
 where I submit the application.

 -XX:+UseConcMarkSweepGC

 Is that correct?

 Will this option be populated to each worker in yarn?



 On Thu, Jul 17, 2014 at 9:26 PM, Andrew Or and...@databricks.com wrote:

 Hi Chen,

 spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark
 0.9. You need to

 export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2

 in conf/spark-env.sh.

 Let me know if that works.
 Andrew


 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com:

 Can you check in the environment tab of Spark web ui to see whether this
 configuration parameter is in effect?

 TD


 On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com
 wrote:

 I am using spark 0.9.0 and I am able to submit job to YARN,
 https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

 I am trying to turn on gc logging on executors but could not find a
 way to set extra Java opts for workers.

 I tried to set spark.executor.extraJavaOptions but that did not work.

 Any idea on how I should do this?

 --
 Chen Song






 --
 Chen Song





-- 
Chen Song


Re: spark streaming rate limiting from kafka

2014-07-18 Thread Chen Song
Thanks Tathagata,

That would be awesome if Spark streaming can support receiving rate in
general. I tried to explore the link you provided but could not find any
specific JIRA related to this? Do you have the JIRA number for this?



On Thu, Jul 17, 2014 at 9:21 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 You can create multiple kafka stream to partition your topics across them,
 which will run multiple receivers or multiple executors. This is covered in
 the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song







-- 
Chen Song


Re: Distribute data from Kafka evenly on cluster

2014-07-18 Thread Chen Song
Speaking of this, I have another related question.

In my spark streaming job, I set up multiple consumers to receive data from
Kafka, with each worker from one partition.

Initially, Spark is intelligent enough to associate each worker to each
partition, to make data consumption distributed. After running for a while,
consumers rebalance themselves and some workers start reading partitions
which were with others. This leads to a situation that some worker read
from multiple partitions and some don't read at all. Because of data
volume, this causes heap pressure on some workers.

Any thoughts on why rebalance is triggered and how to monitor to avoid that?




On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 unfortunately, when I go the above approach, I run into this problem:

 http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E
 That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver
 will retry again and again, but will eventually fail, leading to
 unprocessed data and, worse, the task never terminating. There is nothing
 exotic about my setup; one Zookeeper node, one Kafka broker, so I am
 wondering if other people have seen this error before and, more important,
 how to fix it. When I don't use the approach of multiple kafkaStreams, I
 don't get this error, but also work is never distributed in my cluster...

 Thanks
 Tobias


 On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Thank you very much for the link, that was very helpful!

 So, apparently the `topics: Map[String, Int]` parameter controls the
 number of partitions that the data is initially added to; the number N in

   val kafkaInputs = (1 to N).map { _ =
 ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1))
   }
   val union = ssc.union(kafkaInputs)

 controls how many connections are made to Kafka. Note that the number of
 Kafka partitions for that topic must be at least N for this to work.

 Thanks
 Tobias





-- 
Chen Song


how to pass extra Java opts to workers for spark streaming jobs

2014-07-17 Thread Chen Song
I am using spark 0.9.0 and I am able to submit job to YARN,
https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

I am trying to turn on gc logging on executors but could not find a way to
set extra Java opts for workers.

I tried to set spark.executor.extraJavaOptions but that did not work.

Any idea on how I should do this?

-- 
Chen Song


Re: spark streaming rate limiting from kafka

2014-07-17 Thread Chen Song
Thanks Luis and Tobias.


On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




-- 
Chen Song


Re: how to pass extra Java opts to workers for spark streaming jobs

2014-07-17 Thread Chen Song
Thanks Andrew.

Say that I want to turn on CMS gc for each worker.

All I need to do is add the following line to conf/spark-env.sh on node
where I submit the application.

-XX:+UseConcMarkSweepGC

Is that correct?

Will this option be populated to each worker in yarn?



On Thu, Jul 17, 2014 at 9:26 PM, Andrew Or and...@databricks.com wrote:

 Hi Chen,

 spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark
 0.9. You need to

 export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2

 in conf/spark-env.sh.

 Let me know if that works.
 Andrew


 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com:

 Can you check in the environment tab of Spark web ui to see whether this
 configuration parameter is in effect?

 TD


 On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com
 wrote:

 I am using spark 0.9.0 and I am able to submit job to YARN,
 https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

 I am trying to turn on gc logging on executors but could not find a way
 to set extra Java opts for workers.

 I tried to set spark.executor.extraJavaOptions but that did not work.

 Any idea on how I should do this?

 --
 Chen Song






-- 
Chen Song


Re: spark streaming counter metrics

2014-07-02 Thread Chen Song
Thanks Mayur. I will take a look at StreamingListener.

Is there any example you have handy?


On Wed, Jul 2, 2014 at 2:32 AM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 You may be able to mix StreamingListener  SparkListener to get meaningful
 information about your task. however you need to connect a lot of pieces to
 make sense of the flow..

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Mon, Jun 30, 2014 at 9:58 PM, Chen Song chen.song...@gmail.com wrote:

 I am new to spark streaming and wondering if spark streaming tracks
 counters (e.g., how many rows in each consumer, how many rows routed to an
 individual reduce task, etc.) in any form so I can get an idea of how data
 is skewed? I checked spark job page but don't seem to find any.



 --
 Chen Song





-- 
Chen Song


spark streaming rate limiting from kafka

2014-07-01 Thread Chen Song
In my use case, if I need to stop spark streaming for a while, data would
accumulate a lot on kafka topic-partitions. After I restart spark streaming
job, the worker's heap will go out of memory on the fetch of the 1st batch.

I am wondering if

* Is there a way to throttle reading from kafka in spark streaming jobs?
* Is there a way to control how far Kafka Dstream can read on
topic-partition (via offset for example). By setting this to a small
number, it will force DStream to read less data initially.
* Is there a way to limit the consumption rate at Kafka side? (This one is
not actually for spark streaming and doesn't seem to be question in this
group. But I am raising it anyway here.)

I have looked at code example below but doesn't seem it is supported.

KafkaUtils.createStream ...
Thanks, All
-- 
Chen Song


spark streaming counter metrics

2014-06-30 Thread Chen Song
I am new to spark streaming and wondering if spark streaming tracks
counters (e.g., how many rows in each consumer, how many rows routed to an
individual reduce task, etc.) in any form so I can get an idea of how data
is skewed? I checked spark job page but don't seem to find any.



-- 
Chen Song


Re: spark streaming questions

2014-06-25 Thread Chen Song
Thanks Anwar.


On Tue, Jun 17, 2014 at 11:54 AM, Anwar Rizal anriza...@gmail.com wrote:


 On Tue, Jun 17, 2014 at 5:39 PM, Chen Song chen.song...@gmail.com wrote:

 Hey

 I am new to spark streaming and apologize if these questions have been
 asked.

 * In StreamingContext, reduceByKey() seems to only work on the RDDs of
 the current batch interval, not including RDDs of previous batches. Is my
 understanding correct?


 It's correct.



 * If the above statement is correct, what functions to use if one wants
 to do processing on the continuous stream batches of data? I see 2
 functions, reduceByKeyAndWindow and updateStateByKey which serve this
 purpose.


 I presume that you need to keep a state that goes beyond one batch, so
 multiple batches. In this case, yes, updateStateByKey is the one you will
 use. Basically, updateStateByKey wraps a state into an RDD.





 My use case is an aggregation and doesn't fit a windowing scenario.

 * As for updateStateByKey, I have a few questions.
 ** Over time, will spark stage original data somewhere to replay in case
 of failures? Say the Spark job run for weeks, I am wondering how that
 sustains?
 ** Say my reduce key space is partitioned by some date field and I would
 like to stop processing old dates after a period time (this is not a simply
 windowing scenario as which date the data belongs to is not the same thing
 when the data arrives). How can I handle this to tell spark to discard data
 for old dates?


 You will need to call checkpoint (see
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#rdd-checkpointing)
  that will persist the metadata of RDD that will consume memory (and stack
 execution) otherwise. You can set the interval of checkpointing that suits
 your need.

 Now, if you want to also reset your state after some times, there is no
 immediate way I can think of ,but you can do it through updateStateByKey,
 maybe by book-keeping the timestamp.




 Thank you,

 Best
 Chen






-- 
Chen Song


semi join spark streaming

2014-06-25 Thread Chen Song
Is there a easy way to do semi join in spark streaming?

Here is my problem briefly, I have a DStream that will generate a set of
values. I would like to check the existence in this set in other DStreams.

Is there a easy and standard way to model this problem. If not, can I write
spark streaming job to load the set of values from disk and cache to each
worker?

-- 
Chen Song


spark streaming questions

2014-06-17 Thread Chen Song
Hey

I am new to spark streaming and apologize if these questions have been
asked.

* In StreamingContext, reduceByKey() seems to only work on the RDDs of the
current batch interval, not including RDDs of previous batches. Is my
understanding correct?

* If the above statement is correct, what functions to use if one wants to
do processing on the continuous stream batches of data? I see 2 functions,
reduceByKeyAndWindow and updateStateByKey which serve this purpose.

My use case is an aggregation and doesn't fit a windowing scenario.

* As for updateStateByKey, I have a few questions.
** Over time, will spark stage original data somewhere to replay in case of
failures? Say the Spark job run for weeks, I am wondering how that sustains?
** Say my reduce key space is partitioned by some date field and I would
like to stop processing old dates after a period time (this is not a simply
windowing scenario as which date the data belongs to is not the same thing
when the data arrives). How can I handle this to tell spark to discard data
for old dates?

Thank you,

Best
Chen


Fwd: spark streaming questions

2014-06-16 Thread Chen Song
Hey

I am new to spark streaming and apologize if these questions have been
asked.

* In StreamingContext, reduceByKey() seems to only work on the RDDs of the
current batch interval, not including RDDs of previous batches. Is my
understanding correct?

* If the above statement is correct, what functions to use if one wants to
do processing on the continuous stream batches of data? I see 2 functions,
reduceByKeyAndWindow and updateStateByKey which serve this purpose.

My use case is an aggregation and doesn't fit a windowing scenario.

* As for updateStateByKey, I have a few questions.
** Over time, will spark stage original data somewhere to replay in case of
failures? Say the Spark job run for weeks, I am wondering how that sustains?
** Say my reduce key space is partitioned by some date field and I would
like to stop processing old dates after a period time (this is not a simply
windowing scenario as which date the data belongs to is not the same thing
when the data arrives). How can I handle this to tell spark to discard data
for old dates?

Thank you,

Best
Chen




-- 
Chen Song