Sharding vs. Per-Timeframe Tables

2015-09-29 Thread Jan Algermissen
Hi,

I am using Spark and the Cassandra-connector to save customer events for later 
batch analysis.

Primary access pattern later on will be by time-slice

One way to save the events would be to create a C* row per day, for example, 
and within that row store the events in decreasing time order.

However, this will cause a hot spot in the cluster for each day.

The other two options I see would be sharding (e.g. create 100 rows per day) or 
use a new table for every day.

I prefer the last option, but am not sure whether that is a good pattern with 
the C* connector.

Can anyone provide insights to guide that decision?

Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Where are logs for Spark Kafka Yarn on Cloudera

2015-09-29 Thread Rachana Srivastava
Hello all,

I am trying to test JavaKafkaWordCount on Yarn, to make sure Yarn is working 
fine I am saving the output to hdfs.  The example works fine in local mode but 
not on yarn mode.  I cannot see any output logged when I changed the mode to 
yarn-client or yarn-cluster or cannot find any errors logged.  For my 
application id I was looking for logs under /var/log/hadoop-yarn/containers 
(e.g 
/var/log/hadoop-yarn/containers/application_1439517792099_0010/container_1439517792099_0010_01_03/stderr)
 but I cannot find anything useful information.   Is it the only location where 
application logs are logged.

Also tried setting log output to spark.yarn.app.container.log.dir but got 
access denied error.

Question:  Do we need to have some special setup to run spark streaming on 
Yarn?  How do we debug?  Where to find more details to test streaming on Yarn.

Thanks,

Rachana


unsubscribe

2015-09-29 Thread sukesh kumar
unsubscribe

-- 
Thanks & Best Regards
Sukesh Kumar


OOM error in Spark worker

2015-09-29 Thread varun sharma
My workers are going OOM over time. I am running a streaming job in spark
1.4.0.
Here is the heap dump of workers.

/16,802 instances of "org.apache.spark.deploy.worker.ExecutorRunner", loaded
by "sun.misc.Launcher$AppClassLoader @ 0xdff94088" occupy 488,249,688
(95.80%) bytes. These instances are referenced from one instance of
"java.lang.Object[]", loaded by ""

Keywords
org.apache.spark.deploy.worker.ExecutorRunner
java.lang.Object[]
sun.misc.Launcher$AppClassLoader @ 0xdff94088
/

I am getting below error continuously if one of the worker/executor dies on
any node in my spark cluster.
If I start the worker also, error doesn't go. I have to force_kill my
streaming job and restart to fix the issue. Is it some bug?
I am using Spark 1.4.0.

MY_IP in logs is IP of worker node which failed.

/15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194218 -
Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]}
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194217 - Ask
timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]}
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
15/09/03 11:29:11 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 16723
15/09/03 11:29:11 WARN BlockManagerMaster: Failed to remove RDD 194216 - Ask
timed out on
[Actor[akka.tcp://sparkExecutor@MY_IP:38223/user/BlockManagerEndpoint1#656884654]]
after [12 ms]}
/
It is easily reproducible if I manually stop a worker on one of my node.
/15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 329
15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 333
15/09/03 23:52:18 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 334
/
It doesn't go even if I start the worker again.

Follow up question: If my streaming job has consumed some events from Kafka
topic and are pending to be scheduled because of delay in processing... Will
my force killing the streaming job lose that data which is not yet
scheduled?

Please help ASAP.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OOM-error-in-Spark-worker-tp24856.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: nested collection object query

2015-09-29 Thread Tridib Samanta
Well I figure out a way to use explode. But it returns two rows if there is two 
match in nested array objects.
 
select id from department LATERAL VIEW explode(employee) dummy_table as emp 
where emp.name = 'employee0'
 
I was looking for an operator that loops through the array and return true if 
it matches the condition and returns the parent object.
From: tridib.sama...@live.com
To: java8...@hotmail.com; user@spark.apache.org
Subject: RE: nested collection object query
Date: Mon, 28 Sep 2015 22:26:46 -0700




Thanks for you response Yong! Array syntax works fine. But I am not sure how to 
use explode. Should I use as follows?
select id from department where explode(employee).name = 'employee0
 
This query gives me java.lang.UnsupportedOperationException . I am using 
HiveContext.
 
From: java8...@hotmail.com
To: tridib.sama...@live.com; user@spark.apache.org
Subject: RE: nested collection object query
Date: Mon, 28 Sep 2015 20:42:11 -0400




Your employee in fact is an array of struct, not just struct.
If you are using HiveSQLContext, then you can refer it like following:
select id from member where employee[0].name = 'employee0'
The employee[0] is pointing to the 1st element of the array. 
If you want to query all the elements in the array, then you have to use 
"explode" in the Hive. 
See Hive document for this:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-explode
Yong

> Date: Mon, 28 Sep 2015 16:37:23 -0700
> From: tridib.sama...@live.com
> To: user@spark.apache.org
> Subject: nested collection object query
> 
> Hi Friends,
> What is the right syntax to query on collection of nested object? I have a
> following schema and SQL. But it does not return anything. Is the syntax
> correct?
> 
> root
>  |-- id: string (nullable = false)
>  |-- employee: array (nullable = false)
>  ||-- element: struct (containsNull = true)
>  |||-- id: string (nullable = false)
>  |||-- name: string (nullable = false)
>  |||-- speciality: string (nullable = false)
> 
> 
> select id from member where employee.name = 'employee0'
> 
> Uploaded a test if some one want to try it out. NestedObjectTest.java
> 
>   
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/nested-collection-object-query-tp24853.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

  

Hive alter table is failing

2015-09-29 Thread Ophir Cohen
Hi,

I'm using Spark on top of Hive.
As I want to keep old tables I store the DataFrame into tmp table in hive
and when finished successfully I rename the table.

In last few days I've upgrade to use Spark 1.4.1, and as I'm using aws emr
I got Hive 1.0.
Now when I try to rename the table I get the following error:

Caused by: InvalidOperationException(message:Unable to access old location
hdfs://ip-10-140-189-94.ec2.internal:8020/user/hive/warehouse/_29092015_111704_tmp
for table default._29092015_111704_tmp)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:34066)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:34052)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result.read(ThriftHiveMetastore.java:33994)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:1163)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:1147)


I suspect that this is the bug:
https://issues.apache.org/jira/browse/HIVE-10719 but it strange cause it
works from Hive CLI.

Did anyone encounter that?
Do we have any workaround?

Thanks,
Ophir


Re: Monitoring tools for spark streaming

2015-09-29 Thread Adrian Tanase
You can also use the REST api introduced in 1.4 – although it’s harder to parse:

  *   jobs from the same batch are not grouped together
  *   You only get total delay, not scheduling delay

From: Hari Shreedharan
Date: Tuesday, September 29, 2015 at 5:27 AM
To: Shixiong Zhu
Cc: Siva, "user@spark.apache.org"
Subject: Re: Monitoring tools for spark streaming

+1. The Streaming UI should give you more than enough information.

Thanks,
Hari



On Mon, Sep 28, 2015 at 9:55 PM, Shixiong Zhu 
> wrote:

Which version are you using? Could you take a look at the new Streaming UI in 
1.4.0?


Best Regards,

Shixiong Zhu

2015-09-29 7:52 GMT+08:00 Siva 
>:
Hi,

Could someone recommend the monitoring tools for spark streaming?

By extending StreamingListener we can dump the delay in processing of batches 
and some alert messages.

But are there any Web UI tools where we can monitor failures, see delays in 
processing, error messages and setup alerts etc.

Thanks





Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Adrian Tanase
The error message is very explicit (partition is under replicated), I don’t 
think it’s related to networking issues.

Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka —describe 
topic_name and see which brokers are missing from the replica assignment.
(replace home, zk-quorum etc with your own set-up)

Lastly, has this ever worked? Maybe you’ve accidentally created the topic with 
more partitions and replicas than available brokers… try to recreate with fewer 
partitions/replicas, see if it works.

-adrian

From: Dmitry Goldenberg
Date: Tuesday, September 29, 2015 at 3:37 PM
To: Adrian Tanase
Cc: "user@spark.apache.org"
Subject: Re: Kafka error "partitions don't have a leader" / 
LeaderNotAvailableException

Adrian,

Thanks for your response. I just looked at both machines we're testing on and 
on both the Kafka server process looks OK. Anything specific I can check 
otherwise?

From googling around, I see some posts where folks suggest to check the DNS 
settings (those appear fine) and to set the 
advertised.host.name in Kafka's server.properties. 
Yay/nay?

Thanks again.

On Tue, Sep 29, 2015 at 8:31 AM, Adrian Tanase 
> wrote:
I believe some of the brokers in your cluster died and there are a number of 
partitions that nobody is currently managing.

-adrian

From: Dmitry Goldenberg
Date: Tuesday, September 29, 2015 at 3:26 PM
To: "user@spark.apache.org"
Subject: Kafka error "partitions don't have a leader" / 
LeaderNotAvailableException

I apologize for posting this Kafka related issue into the Spark list. Have 
gotten no responses on the Kafka list and was hoping someone on this list could 
shed some light on the below.

---

We're running into this issue in a clustered environment where we're trying to 
send messages to Kafka and are getting the below error.

Can someone explain what might be causing it and what the error message means 
(Failed to send data since partitions [,8] don't have a leader) ?

---

WARN kafka.producer.BrokerPartitionInfo: Error while fetching metadata 
partition 10 leader: none replicas: isr: isUnderReplicated: false for topic 
partition [,10]: [class kafka.common.LeaderNotAvailableException]

ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests for 
topics  with correlation ids in [2398792,2398801]

ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while sending a 
message to the message store. kafka.common.FailedToSendMessageException: Failed 
to send messages after 3 tries.
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) 
~[kafka_2.10-0.8.2.0.jar:?]
at kafka.producer.Producer.send(Producer.scala:77) ~[kafka_2.10-0.8.2.0.jar:?]
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
~[kafka_2.10-0.8.2.0.jar:?]

WARN kafka.producer.async.DefaultEventHandler: Failed to send data since 
partitions [,8] don't have a leader

What do these errors and warnings mean and how do we get around them?

---

The code for sending messages is basically as follows:

public class KafkaMessageProducer {
private Producer producer;

.

public void sendMessage(String topic, String key, String message) throws 
IOException, MessagingException {
KeyedMessage data = new KeyedMessage(topic, 
key, message);
try {
  producer.send(data);
} catch (Exception ex) {
  throw new MessagingException("Error while sending a message to the 
message store.", ex);
}
}

Is it possible that the producer gets "stale" and needs to be re-initialized?  
Do we want to re-create the producer on every message (??) or is it OK to hold 
on to one indefinitely?

---

The following are the producer properties that are being set into the producer

batch.num.messages => 200
client.id => Acme
compression.codec => none
key.serializer.class => kafka.serializer.StringEncoder
message.send.max.retries => 3
metadata.broker.list => 
data2.acme.com:9092,data3.acme.com:9092
partitioner.class => kafka.producer.DefaultPartitioner
producer.type => sync
queue.buffering.max.messages => 1
queue.buffering.max.ms => 5000
queue.enqueue.timeout.ms => -1
request.required.acks => 1
request.timeout.ms => 1
retry.backoff.ms => 1000
send.buffer.bytes => 102400
serializer.class => kafka.serializer.StringEncoder

Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
"more partitions and replicas than available brokers" -- what would be a
good ratio?

We've been trying to set up 3 topics with 64 partitions.  I'm including the
output of "bin/kafka-topics.sh --zookeeper localhost:2181 --describe
topic1" below.

I think it's symptomatic and confirms your theory, Adrian, that we've got
too many partitions. In fact, for topic 2, only 12 partitions appear to
have been created despite the requested 64.  Does Kafka have the limit of
140 partitions total within a cluster?

The doc doesn't appear to have any prescriptions as to how you go about
calculating an optimal number of partitions.

We'll definitely try with fewer, I'm just looking for a good formula to
calculate how many. And no, Adrian, this hasn't worked yet, so we'll start
with something like 12 partitions.  It'd be good to know how high we can go
with that...

Topic:topic1 PartitionCount:64 ReplicationFactor:1 Configs:

Topic: topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1

Topic: topic2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2




Topic: topic3 Partition: 63 Leader: 2 Replicas: 2 Isr: 2

---

Topic:topic2 PartitionCount:12 ReplicationFactor:1 Configs:

Topic: topic2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2

Topic: topic2 Partition: 1 Leader: 1 Replicas: 1 Isr: 1




Topic: topic2 Partition: 11 Leader: 1 Replicas: 1 Isr: 1

---

Topic:topic3 PartitionCount:64 ReplicationFactor:1 Configs:

Topic: topic3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2

Topic: topic3 Partition: 1 Leader: 1 Replicas: 1 Isr: 1




Topic: topic3 Partition: 63 Leader: 1 Replicas: 1 Isr: 1


On Tue, Sep 29, 2015 at 8:47 AM, Adrian Tanase  wrote:

> The error message is very explicit (partition is under replicated), I
> don’t think it’s related to networking issues.
>
> Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka
> —describe topic_name and see which brokers are missing from the replica
> assignment.
> *(replace home, zk-quorum etc with your own set-up)*
>
> Lastly, has this ever worked? Maybe you’ve accidentally created the topic
> with more partitions and replicas than available brokers… try to recreate
> with fewer partitions/replicas, see if it works.
>
> -adrian
>
> From: Dmitry Goldenberg
> Date: Tuesday, September 29, 2015 at 3:37 PM
> To: Adrian Tanase
> Cc: "user@spark.apache.org"
> Subject: Re: Kafka error "partitions don't have a leader" /
> LeaderNotAvailableException
>
> Adrian,
>
> Thanks for your response. I just looked at both machines we're testing on
> and on both the Kafka server process looks OK. Anything specific I can
> check otherwise?
>
> From googling around, I see some posts where folks suggest to check the
> DNS settings (those appear fine) and to set the advertised.host.name in
> Kafka's server.properties. Yay/nay?
>
> Thanks again.
>
> On Tue, Sep 29, 2015 at 8:31 AM, Adrian Tanase  wrote:
>
>> I believe some of the brokers in your cluster died and there are a number
>> of partitions that nobody is currently managing.
>>
>> -adrian
>>
>> From: Dmitry Goldenberg
>> Date: Tuesday, September 29, 2015 at 3:26 PM
>> To: "user@spark.apache.org"
>> Subject: Kafka error "partitions don't have a leader" /
>> LeaderNotAvailableException
>>
>> I apologize for posting this Kafka related issue into the Spark list.
>> Have gotten no responses on the Kafka list and was hoping someone on this
>> list could shed some light on the below.
>>
>> 
>> ---
>>
>> We're running into this issue in a clustered environment where we're
>> trying to send messages to Kafka and are getting the below error.
>>
>> Can someone explain what might be causing it and what the error message
>> means (Failed to send data since partitions [,8] don't have a
>> leader) ?
>>
>>
>> ---
>>
>> WARN kafka.producer.BrokerPartitionInfo: Error while fetching
>> metadata partition 10 leader: none replicas: isr: isUnderReplicated: false
>> for topic partition [,10]: [class
>> kafka.common.LeaderNotAvailableException]
>>
>> ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests
>> for topics  with correlation ids in [2398792,2398801]
>>
>> ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while
>> sending a message to the message
>> store. kafka.common.FailedToSendMessageException: Failed to send messages
>> after 3 tries.
>> at
>> 

Re: Hive alter table is failing

2015-09-29 Thread Ophir Cohen
Nop, I'm checking it out thanks!

On Tue, Sep 29, 2015 at 3:30 PM, Ted Yu  wrote:

> Have you seen this thread ?
> http://search-hadoop.com/m/q3RTtGwP431AQ2B41
>
> Plugin metastore version for your deployment.
>
> Cheers
>
> On Sep 29, 2015, at 5:20 AM, Ophir Cohen  wrote:
>
> Hi,
>
> I'm using Spark on top of Hive.
> As I want to keep old tables I store the DataFrame into tmp table in hive
> and when finished successfully I rename the table.
>
> In last few days I've upgrade to use Spark 1.4.1, and as I'm using aws emr
> I got Hive 1.0.
> Now when I try to rename the table I get the following error:
>
> Caused by: InvalidOperationException(message:Unable to access old location
> hdfs://ip-10-140-189-94.ec2.internal:8020/user/hive/warehouse/_29092015_111704_tmp
> for table default._29092015_111704_tmp)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:34066)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:34052)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result.read(ThriftHiveMetastore.java:33994)
> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:1163)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:1147)
>
>
> I suspect that this is the bug:
> https://issues.apache.org/jira/browse/HIVE-10719 but it strange cause it
> works from Hive CLI.
>
> Did anyone encounter that?
> Do we have any workaround?
>
> Thanks,
> Ophir
>
>


Cant perform full outer join

2015-09-29 Thread Saif.A.Ellafi
Hi all,

So I Have two dataframes, with two columns: DATE and VALUE.

Performing this:
data = data.join(cur_data, data("DATE") === cur_data("DATE"), "outer")

returns
Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 
'DATE' is ambiguous, could be: DATE#0, DATE#3.;

But if I change one of the column names, I will get two columns and won't 
really merge "DATE" column as I wish. Any ideas without going to non trivial 
procedures?

Thanks,
Saif



Re: Spark Streaming Log4j Inside Eclipse

2015-09-29 Thread Ashish Soni
I am using Java Streaming context and it doesnt have method setLogLevel and
also i have tried by passing VM argument in eclipse and it doesnt work

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));

Ashish

On Tue, Sep 29, 2015 at 7:23 AM, Adrian Tanase  wrote:

> You should set exta java options for your app via Eclipse project and
> specify something like
>
>  -Dlog4j.configuration=file:/tmp/log4j.properties
>
> Sent from my iPhone
>
> On 28 Sep 2015, at 18:52, Shixiong Zhu  wrote:
>
> You can use JavaSparkContext.setLogLevel to set the log level in your
> codes.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-09-28 22:55 GMT+08:00 Ashish Soni :
>
>> I am not running it using spark submit , i am running locally inside
>> Eclipse IDE , how i set this using JAVA Code
>>
>> Ashish
>>
>> On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase 
>> wrote:
>>
>>> You also need to provide it as parameter to spark submit
>>>
>>> http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver
>>>
>>> From: Ashish Soni
>>> Date: Monday, September 28, 2015 at 5:18 PM
>>> To: user
>>> Subject: Spark Streaming Log4j Inside Eclipse
>>>
>>> I need to turn off the verbose logging of Spark Streaming Code when i am
>>> running inside eclipse i tried creating a log4j.properties file and placed
>>> inside /src/main/resources but i do not see it getting any effect , Please
>>> help as not sure what else needs to be done to change the log at DEBUG or
>>> WARN
>>>
>>
>>
>


Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
Adrian,

Thanks for your response. I just looked at both machines we're testing on
and on both the Kafka server process looks OK. Anything specific I can
check otherwise?

>From googling around, I see some posts where folks suggest to check the DNS
settings (those appear fine) and to set the advertised.host.name in Kafka's
server.properties. Yay/nay?

Thanks again.

On Tue, Sep 29, 2015 at 8:31 AM, Adrian Tanase  wrote:

> I believe some of the brokers in your cluster died and there are a number
> of partitions that nobody is currently managing.
>
> -adrian
>
> From: Dmitry Goldenberg
> Date: Tuesday, September 29, 2015 at 3:26 PM
> To: "user@spark.apache.org"
> Subject: Kafka error "partitions don't have a leader" /
> LeaderNotAvailableException
>
> I apologize for posting this Kafka related issue into the Spark list. Have
> gotten no responses on the Kafka list and was hoping someone on this list
> could shed some light on the below.
>
> 
> ---
>
> We're running into this issue in a clustered environment where we're
> trying to send messages to Kafka and are getting the below error.
>
> Can someone explain what might be causing it and what the error message
> means (Failed to send data since partitions [,8] don't have a
> leader) ?
>
>
> ---
>
> WARN kafka.producer.BrokerPartitionInfo: Error while fetching
> metadata partition 10 leader: none replicas: isr: isUnderReplicated: false
> for topic partition [,10]: [class
> kafka.common.LeaderNotAvailableException]
>
> ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests
> for topics  with correlation ids in [2398792,2398801]
>
> ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while
> sending a message to the message
> store. kafka.common.FailedToSendMessageException: Failed to send messages
> after 3 tries.
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> ~[kafka_2.10-0.8.2.0.jar:?]
> at kafka.producer.Producer.send(Producer.scala:77)
> ~[kafka_2.10-0.8.2.0.jar:?]
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> ~[kafka_2.10-0.8.2.0.jar:?]
>
> WARN kafka.producer.async.DefaultEventHandler: Failed to send data since
> partitions [,8] don't have a leader
>
> What do these errors and warnings mean and how do we get around them?
>
>
> ---
>
> The code for sending messages is basically as follows:
>
> public class KafkaMessageProducer {
> private Producer producer;
>
> .
>
> public void sendMessage(String topic, String key,
> String message) throws IOException, MessagingException {
> KeyedMessage data = new KeyedMessage String>(topic, key, message);
> try {
>   producer.send(data);
> } catch (Exception ex) {
>   throw new MessagingException("Error while sending a message to the
> message store.", ex);
> }
> }
>
> Is it possible that the producer gets "stale" and needs to be
> re-initialized?  Do we want to re-create the producer on every message (??)
> or is it OK to hold on to one indefinitely?
>
>
> ---
>
> The following are the producer properties that are being set into the
> producer
>
> batch.num.messages => 200
> client.id => Acme
> compression.codec => none
> key.serializer.class => kafka.serializer.StringEncoder
> message.send.max.retries => 3
> metadata.broker.list => data2.acme.com:9092,data3.acme.com:9092
> partitioner.class => kafka.producer.DefaultPartitioner
> producer.type => sync
> queue.buffering.max.messages => 1
> queue.buffering.max.ms => 5000
> queue.enqueue.timeout.ms => -1
> request.required.acks => 1
> request.timeout.ms => 1
> retry.backoff.ms => 1000
> send.buffer.bytes => 102400
> serializer.class => kafka.serializer.StringEncoder
> topic.metadata.refresh.interval.ms => 60
>
>
> Thanks.
>


RE: Setting executors per worker - Standalone

2015-09-29 Thread java8964
I don't think you can do that in the Standalone mode before 1.5.
The best you can do is to have multi workers per box. One worker can and will 
only start one executor, before Spark 1.5.
What you can do is to set "SPARK_WORKER_INSTANCES", which control how many 
worker instances you can start per box.
Yong 

Date: Mon, 28 Sep 2015 20:47:18 -0700
Subject: Re: Setting executors per worker - Standalone
From: james.p...@gmail.com
To: zjf...@gmail.com
CC: user@spark.apache.org

Thanks for your reply.
Setting it as 
--conf spark.executor.cores=1 
when I start spark-shell (as an example application) indeed sets the number of 
cores per executor as 1 (which is 4 before), but I still have 1 executor per 
worker. What I am really looking for is having 1 worker with 4 executor (each 
with one core) per machine when I run my application. Based one the 
documentation it seems it is feasible, but it is not clear as how.
Thanks.
On Mon, Sep 28, 2015 at 8:46 PM, Jeff Zhang  wrote:
use "--executor-cores 1" you will get 4 executors per worker since you have 4 
cores per worker


On Tue, Sep 29, 2015 at 8:24 AM, James Pirz  wrote:
Hi,
I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while each 
machine has 12GB of RAM and 4 cores. On each machine I have one worker which is 
running one executor that grabs all 4 cores. I am interested to check the 
performance with "one worker but 4 executors per machine - each with one core".
I can see that "running multiple executors per worker in Standalone mode" is 
possible based on the closed issue:
https://issues.apache.org/jira/browse/SPARK-1706

But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only 
available for the Yarn mode, and in the standalone mode I can just set 
"SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".
Any hint or suggestion would be great.



-- 
Best Regards

Jeff Zhang


  

RE: nested collection object query

2015-09-29 Thread java8964
You have 2 options:
Option 1:
Use lateral view explode, as you did below. But if you want to remove the 
duplicate, then use distinct after that.
For example:
col1, col2, ArrayOf(Struct)
After explode:
col1, col2, employee0col1, col2, employee1col1, col2, employee0
Then select distinct col1, col2 from ... where emp.name='employee0'
Option 2: Implement your own UDF, to do the logic you want to do. In fact, in 
the Hive, there is already one called array_contains(), which check if the 
array contain the data you want. But in  your case, your data in the array is a 
struct, and you only want to compare name of the struct, instead of whole 
struct. You need to override the equals() logic of array_contains() in the 
Hive, so you have to implement that by custom UDF.
See the hive function of array_contains here:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-CollectionFunctions
Yong
From: tridib.sama...@live.com
To: java8...@hotmail.com; user@spark.apache.org
Subject: RE: nested collection object query
Date: Mon, 28 Sep 2015 23:02:41 -0700




Well I figure out a way to use explode. But it returns two rows if there is two 
match in nested array objects.
 
select id from department LATERAL VIEW explode(employee) dummy_table as emp 
where emp.name = 'employee0'
 
I was looking for an operator that loops through the array and return true if 
it matches the condition and returns the parent object.
From: tridib.sama...@live.com
To: java8...@hotmail.com; user@spark.apache.org
Subject: RE: nested collection object query
Date: Mon, 28 Sep 2015 22:26:46 -0700




Thanks for you response Yong! Array syntax works fine. But I am not sure how to 
use explode. Should I use as follows?
select id from department where explode(employee).name = 'employee0
 
This query gives me java.lang.UnsupportedOperationException . I am using 
HiveContext.
 
From: java8...@hotmail.com
To: tridib.sama...@live.com; user@spark.apache.org
Subject: RE: nested collection object query
Date: Mon, 28 Sep 2015 20:42:11 -0400




Your employee in fact is an array of struct, not just struct.
If you are using HiveSQLContext, then you can refer it like following:
select id from member where employee[0].name = 'employee0'
The employee[0] is pointing to the 1st element of the array. 
If you want to query all the elements in the array, then you have to use 
"explode" in the Hive. 
See Hive document for this:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-explode
Yong

> Date: Mon, 28 Sep 2015 16:37:23 -0700
> From: tridib.sama...@live.com
> To: user@spark.apache.org
> Subject: nested collection object query
> 
> Hi Friends,
> What is the right syntax to query on collection of nested object? I have a
> following schema and SQL. But it does not return anything. Is the syntax
> correct?
> 
> root
>  |-- id: string (nullable = false)
>  |-- employee: array (nullable = false)
>  ||-- element: struct (containsNull = true)
>  |||-- id: string (nullable = false)
>  |||-- name: string (nullable = false)
>  |||-- speciality: string (nullable = false)
> 
> 
> select id from member where employee.name = 'employee0'
> 
> Uploaded a test if some one want to try it out. NestedObjectTest.java
> 
>   
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/nested-collection-object-query-tp24853.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


  

Re: Fetching Date value from spark.sql.row in Spark 1.2.2

2015-09-29 Thread satish chandra j
HI All,
If any alternate solutions to get the Date value from
org.apache.spark.sql.row please suggest me

Regards,
Satish Chandra

On Tue, Sep 29, 2015 at 4:41 PM, satish chandra j 
wrote:

> HI All,
> Currently using Spark 1.2.2, as getDate method is not defined in this
> version hence trying to fetch Date value of a specific coulmn using *get*
> method as specified in docs (ref URL given below:)
>
>
> https://spark.apache.org/docs/1.2.2/api/java/index.html?org/apache/spark/sql/api/java/Row.html
>
> But getting an error: "value get is not a member of
> org.apache.spark.sql.row"
>
> Regards,
> Satish Chandra
>
>
>
>
>
>


Re: Merging two avro RDD/DataFrames

2015-09-29 Thread Adrian Tanase
Seems to me that the obvious candidate is loading both master and delta, using 
join or cogroup then write the new master.

Through some clever sharding and key management you might achieve some 
efficiency gains, but I’d say start here if your numbers are in the hundreds of 
thousands… should run under a minute with the correct resources…

-adrian

From: TEST ONE
Date: Tuesday, September 29, 2015 at 3:00 AM
To: "user@spark.apache.org"
Subject: Merging two avro RDD/DataFrames


I have a daily update of modified users (~100s) output as avro from ETL. I’d 
need to find and merge with existing corresponding members in a master avro 
file (~100,000s) The merge operation involves merging a ‘profiles’ 
Map between the matching records.


What would be the recommended pattern to handle record merging with Spark?


Thanks,

kc


Re: Where are logs for Spark Kafka Yarn on Cloudera

2015-09-29 Thread Marcelo Vanzin
(-dev@)

Try using the "yarn logs" command to read logs for finished
applications. You can also browse the RM UI to find more information
about the applications you ran.

On Mon, Sep 28, 2015 at 11:37 PM, Rachana Srivastava
 wrote:
> Hello all,
>
>
>
> I am trying to test JavaKafkaWordCount on Yarn, to make sure Yarn is working
> fine I am saving the output to hdfs.  The example works fine in local mode
> but not on yarn mode.  I cannot see any output logged when I changed the
> mode to yarn-client or yarn-cluster or cannot find any errors logged.  For
> my application id I was looking for logs under
> /var/log/hadoop-yarn/containers (e.g
> /var/log/hadoop-yarn/containers/application_1439517792099_0010/container_1439517792099_0010_01_03/stderr)
> but I cannot find anything useful information.   Is it the only location
> where application logs are logged.
>
>
>
> Also tried setting log output to spark.yarn.app.container.log.dir but got
> access denied error.
>
>
>
> Question:  Do we need to have some special setup to run spark streaming on
> Yarn?  How do we debug?  Where to find more details to test streaming on
> Yarn.
>
>
>
> Thanks,
>
>
>
> Rachana



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Converting a DStream to schemaRDD

2015-09-29 Thread Daniel Haviv
Hi,
I have a DStream which is a stream of RDD[String].

How can I pass a DStream to sqlContext.jsonRDD and work with it as a DF ?

Thank you.
Daniel


RandomForestClassifer does not recognize number of classes, nor can number of classes be set

2015-09-29 Thread Kristina Rogale Plazonic
Hi,

I'm trying out the ml.classification.RandomForestClassifer() on a simple
dataframe and it returns an exception that number of classes has not been
set in my dataframe. However, I cannot find a function that would set
number of classes, or pass it as an argument anywhere. In mllib, numClasses
is a parameter passed when training the model. In ml, there is an ugly hack
using StringIndexer, but should you really be using the hack?
LogisticRegression and NaiveBayes in ml work without setting the number of
classes.

Thanks for any pointers!
Kristina

My code:

import org.apache.spark.mllib.linalg.{Vector, Vectors}

case class Record(label:Double,
features:org.apache.spark.mllib.linalg.Vector)

val df = sc.parallelize(Seq( Record(0.0, Vectors.dense(1.0, 0.0) ),
Record(0.0, Vectors.dense(1.1, 0.0) ),
Record(0.0, Vectors.dense(1.2, 0.0) ),
Record(1.0, Vectors.dense(0.0, 1.2) ),
Record(1.0, Vectors.dense(0.0, 1.3) ),
Record(1.0, Vectors.dense(0.0, 1.7) ))
   ).toDF()

val rf = new RandomForestClassifier()
val rfmodel = rf.fit(df)

And the error is:

scala> val rfmodel = rf.fit(df)
java.lang.IllegalArgumentException: RandomForestClassifier was given input
with invalid label column label, without the number of classes specified.
See StringIndexer.
at
org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:87)
at
org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:42)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)


Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
I apologize for posting this Kafka related issue into the Spark list. Have
gotten no responses on the Kafka list and was hoping someone on this list
could shed some light on the below.


---

We're running into this issue in a clustered environment where we're trying
to send messages to Kafka and are getting the below error.

Can someone explain what might be causing it and what the error message
means (Failed to send data since partitions [,8] don't have a
leader) ?

---

WARN kafka.producer.BrokerPartitionInfo: Error while fetching
metadata partition 10 leader: none replicas: isr: isUnderReplicated: false
for topic partition [,10]: [class
kafka.common.LeaderNotAvailableException]

ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests for
topics  with correlation ids in [2398792,2398801]

ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while
sending a message to the message
store. kafka.common.FailedToSendMessageException: Failed to send messages
after 3 tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
~[kafka_2.10-0.8.2.0.jar:?]
at kafka.producer.Producer.send(Producer.scala:77)
~[kafka_2.10-0.8.2.0.jar:?]
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
~[kafka_2.10-0.8.2.0.jar:?]

WARN kafka.producer.async.DefaultEventHandler: Failed to send data since
partitions [,8] don't have a leader

What do these errors and warnings mean and how do we get around them?

---

The code for sending messages is basically as follows:

public class KafkaMessageProducer {
private Producer producer;

.

public void sendMessage(String topic, String key,
String message) throws IOException, MessagingException {
KeyedMessage data = new KeyedMessage(topic, key, message);
try {
  producer.send(data);
} catch (Exception ex) {
  throw new MessagingException("Error while sending a message to the
message store.", ex);
}
}

Is it possible that the producer gets "stale" and needs to be
re-initialized?  Do we want to re-create the producer on every message (??)
or is it OK to hold on to one indefinitely?

---

The following are the producer properties that are being set into the
producer

batch.num.messages => 200
client.id => Acme
compression.codec => none
key.serializer.class => kafka.serializer.StringEncoder
message.send.max.retries => 3
metadata.broker.list => data2.acme.com:9092,data3.acme.com:9092
partitioner.class => kafka.producer.DefaultPartitioner
producer.type => sync
queue.buffering.max.messages => 1
queue.buffering.max.ms => 5000
queue.enqueue.timeout.ms => -1
request.required.acks => 1
request.timeout.ms => 1
retry.backoff.ms => 1000
send.buffer.bytes => 102400
serializer.class => kafka.serializer.StringEncoder
topic.metadata.refresh.interval.ms => 60


Thanks.


Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Adrian Tanase
I believe some of the brokers in your cluster died and there are a number of 
partitions that nobody is currently managing.

-adrian

From: Dmitry Goldenberg
Date: Tuesday, September 29, 2015 at 3:26 PM
To: "user@spark.apache.org"
Subject: Kafka error "partitions don't have a leader" / 
LeaderNotAvailableException

I apologize for posting this Kafka related issue into the Spark list. Have 
gotten no responses on the Kafka list and was hoping someone on this list could 
shed some light on the below.

---

We're running into this issue in a clustered environment where we're trying to 
send messages to Kafka and are getting the below error.

Can someone explain what might be causing it and what the error message means 
(Failed to send data since partitions [,8] don't have a leader) ?

---

WARN kafka.producer.BrokerPartitionInfo: Error while fetching metadata 
partition 10 leader: none replicas: isr: isUnderReplicated: false for topic 
partition [,10]: [class kafka.common.LeaderNotAvailableException]

ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests for 
topics  with correlation ids in [2398792,2398801]

ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while sending a 
message to the message store. kafka.common.FailedToSendMessageException: Failed 
to send messages after 3 tries.
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) 
~[kafka_2.10-0.8.2.0.jar:?]
at kafka.producer.Producer.send(Producer.scala:77) ~[kafka_2.10-0.8.2.0.jar:?]
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
~[kafka_2.10-0.8.2.0.jar:?]

WARN kafka.producer.async.DefaultEventHandler: Failed to send data since 
partitions [,8] don't have a leader

What do these errors and warnings mean and how do we get around them?

---

The code for sending messages is basically as follows:

public class KafkaMessageProducer {
private Producer producer;

.

public void sendMessage(String topic, String key, String message) throws 
IOException, MessagingException {
KeyedMessage data = new KeyedMessage(topic, 
key, message);
try {
  producer.send(data);
} catch (Exception ex) {
  throw new MessagingException("Error while sending a message to the 
message store.", ex);
}
}

Is it possible that the producer gets "stale" and needs to be re-initialized?  
Do we want to re-create the producer on every message (??) or is it OK to hold 
on to one indefinitely?

---

The following are the producer properties that are being set into the producer

batch.num.messages => 200
client.id => Acme
compression.codec => none
key.serializer.class => kafka.serializer.StringEncoder
message.send.max.retries => 3
metadata.broker.list => 
data2.acme.com:9092,data3.acme.com:9092
partitioner.class => kafka.producer.DefaultPartitioner
producer.type => sync
queue.buffering.max.messages => 1
queue.buffering.max.ms => 5000
queue.enqueue.timeout.ms => -1
request.required.acks => 1
request.timeout.ms => 1
retry.backoff.ms => 1000
send.buffer.bytes => 102400
serializer.class => kafka.serializer.StringEncoder
topic.metadata.refresh.interval.ms 
=> 60


Thanks.


Re: Hive alter table is failing

2015-09-29 Thread Ted Yu
Have you seen this thread ?
http://search-hadoop.com/m/q3RTtGwP431AQ2B41

Plugin metastore version for your deployment. 

Cheers

> On Sep 29, 2015, at 5:20 AM, Ophir Cohen  wrote:
> 
> Hi,
> 
> I'm using Spark on top of Hive.
> As I want to keep old tables I store the DataFrame into tmp table in hive and 
> when finished successfully I rename the table.
> 
> In last few days I've upgrade to use Spark 1.4.1, and as I'm using aws emr I 
> got Hive 1.0.
> Now when I try to rename the table I get the following error:
> 
> Caused by: InvalidOperationException(message:Unable to access old location 
> hdfs://ip-10-140-189-94.ec2.internal:8020/user/hive/warehouse/_29092015_111704_tmp
>  for table default._29092015_111704_tmp)
> at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:34066)
> at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:34052)
> at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result.read(ThriftHiveMetastore.java:33994)
> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
> at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:1163)
> at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:1147)
> 
> 
> I suspect that this is the bug:
> https://issues.apache.org/jira/browse/HIVE-10719 but it strange cause it 
> works from Hive CLI.
> 
> Did anyone encounter that?
> Do we have any workaround?
> 
> Thanks,
> Ophir


Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Cody Koeninger
Try writing and reading to the topics in question using the kafka command
line tools, to eliminate your code as a variable.


That number of partitions is probably more than sufficient:

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Obviously if you ask for more replicas than you have brokers you're going
to have a problem, but that doesn't seem to be the case.



Also, depending on what version of kafka you're using on the broker, you
may want to look through the kafka jira, e.g.

https://issues.apache.org/jira/browse/KAFKA-899


On Tue, Sep 29, 2015 at 8:05 AM, Dmitry Goldenberg  wrote:

> "more partitions and replicas than available brokers" -- what would be a
> good ratio?
>
> We've been trying to set up 3 topics with 64 partitions.  I'm including
> the output of "bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> topic1" below.
>
> I think it's symptomatic and confirms your theory, Adrian, that we've got
> too many partitions. In fact, for topic 2, only 12 partitions appear to
> have been created despite the requested 64.  Does Kafka have the limit of
> 140 partitions total within a cluster?
>
> The doc doesn't appear to have any prescriptions as to how you go about
> calculating an optimal number of partitions.
>
> We'll definitely try with fewer, I'm just looking for a good formula to
> calculate how many. And no, Adrian, this hasn't worked yet, so we'll start
> with something like 12 partitions.  It'd be good to know how high we can go
> with that...
>
> Topic:topic1 PartitionCount:64 ReplicationFactor:1 Configs:
>
> Topic: topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
>
> Topic: topic2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
>
>
> 
>
> Topic: topic3 Partition: 63 Leader: 2 Replicas: 2 Isr: 2
>
>
> ---
>
> Topic:topic2 PartitionCount:12 ReplicationFactor:1 Configs:
>
> Topic: topic2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>
> Topic: topic2 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>
>
> 
>
> Topic: topic2 Partition: 11 Leader: 1 Replicas: 1 Isr: 1
>
>
> ---
>
> Topic:topic3 PartitionCount:64 ReplicationFactor:1 Configs:
>
> Topic: topic3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>
> Topic: topic3 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>
>
> 
>
> Topic: topic3 Partition: 63 Leader: 1 Replicas: 1 Isr: 1
>
>
> On Tue, Sep 29, 2015 at 8:47 AM, Adrian Tanase  wrote:
>
>> The error message is very explicit (partition is under replicated), I
>> don’t think it’s related to networking issues.
>>
>> Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka
>> —describe topic_name and see which brokers are missing from the replica
>> assignment.
>> *(replace home, zk-quorum etc with your own set-up)*
>>
>> Lastly, has this ever worked? Maybe you’ve accidentally created the topic
>> with more partitions and replicas than available brokers… try to recreate
>> with fewer partitions/replicas, see if it works.
>>
>> -adrian
>>
>> From: Dmitry Goldenberg
>> Date: Tuesday, September 29, 2015 at 3:37 PM
>> To: Adrian Tanase
>> Cc: "user@spark.apache.org"
>> Subject: Re: Kafka error "partitions don't have a leader" /
>> LeaderNotAvailableException
>>
>> Adrian,
>>
>> Thanks for your response. I just looked at both machines we're testing on
>> and on both the Kafka server process looks OK. Anything specific I can
>> check otherwise?
>>
>> From googling around, I see some posts where folks suggest to check the
>> DNS settings (those appear fine) and to set the advertised.host.name in
>> Kafka's server.properties. Yay/nay?
>>
>> Thanks again.
>>
>> On Tue, Sep 29, 2015 at 8:31 AM, Adrian Tanase  wrote:
>>
>>> I believe some of the brokers in your cluster died and there are a
>>> number of partitions that nobody is currently managing.
>>>
>>> -adrian
>>>
>>> From: Dmitry Goldenberg
>>> Date: Tuesday, September 29, 2015 at 3:26 PM
>>> To: "user@spark.apache.org"
>>> Subject: Kafka error "partitions don't have a leader" /
>>> LeaderNotAvailableException
>>>
>>> I apologize for posting this Kafka related issue into the Spark list.
>>> Have gotten no responses on the Kafka list and was hoping someone on this
>>> list could shed some light on the below.
>>>
>>> 
>>> ---
>>>
>>> We're running into this issue in a clustered environment where we're
>>> trying to send messages to Kafka and are getting the below error.

Re: Converting a DStream to schemaRDD

2015-09-29 Thread Adrian Tanase
Also check this out
https://github.com/databricks/reference-apps/blob/master/logs_analyzer/chapter1/scala/src/main/scala/com/databricks/apps/logs/chapter1/LogAnalyzerStreamingSQL.scala

From the data bricks reference app: https://github.com/databricks/reference-apps

From: Ewan Leith
Date: Tuesday, September 29, 2015 at 5:09 PM
To: Daniel Haviv, user
Subject: RE: Converting a DStream to schemaRDD

Something like:

dstream.foreachRDD { rdd =>
  val df =  sqlContext.read.json(rdd)
  df.select(…)
}

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams


Might be the place to start, it’ll convert each batch of dstream into an RDD 
then let you work it as if it were a standard RDD dataset.

Ewan


From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
Sent: 29 September 2015 15:03
To: user >
Subject: Converting a DStream to schemaRDD

Hi,
I have a DStream which is a stream of RDD[String].

How can I pass a DStream to sqlContext.jsonRDD and work with it as a DF ?

Thank you.
Daniel



Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
Thanks, Cody.

Yes we did see that writeup from Jay, it seems to just refer to his test 6
partitions.  I've been looking for more of a recipe of what the possible
max is vs. what the optimal value may be; haven't found such.

KAFKA-899 appears related but it was fixed in Kafka 0.8.2.0 - we're running
0.8.2.1.

I'm more curious about another error message from the logs which is this:

*fetching topic metadata for topics [Set(my-topic-1)] from broker
[ArrayBuffer(id:0,host:data2.acme.com ,port:9092,
id:1,host:data3.acme.com ,port:9092)] failed*

I know that data2 should have broker ID of 1 and data3 should have broker
ID of 2.  So there's some disconnect somewhere as to what these ID's are.
In Zookeeper, ls /brokers/ids lists: [1, 2].  So where could the [0, 1] be
stuck?



On Tue, Sep 29, 2015 at 9:39 AM, Cody Koeninger  wrote:

> Try writing and reading to the topics in question using the kafka command
> line tools, to eliminate your code as a variable.
>
>
> That number of partitions is probably more than sufficient:
>
>
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> Obviously if you ask for more replicas than you have brokers you're going
> to have a problem, but that doesn't seem to be the case.
>
>
>
> Also, depending on what version of kafka you're using on the broker, you
> may want to look through the kafka jira, e.g.
>
> https://issues.apache.org/jira/browse/KAFKA-899
>
>
> On Tue, Sep 29, 2015 at 8:05 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> "more partitions and replicas than available brokers" -- what would be a
>> good ratio?
>>
>> We've been trying to set up 3 topics with 64 partitions.  I'm including
>> the output of "bin/kafka-topics.sh --zookeeper localhost:2181 --describe
>> topic1" below.
>>
>> I think it's symptomatic and confirms your theory, Adrian, that we've got
>> too many partitions. In fact, for topic 2, only 12 partitions appear to
>> have been created despite the requested 64.  Does Kafka have the limit of
>> 140 partitions total within a cluster?
>>
>> The doc doesn't appear to have any prescriptions as to how you go about
>> calculating an optimal number of partitions.
>>
>> We'll definitely try with fewer, I'm just looking for a good formula to
>> calculate how many. And no, Adrian, this hasn't worked yet, so we'll start
>> with something like 12 partitions.  It'd be good to know how high we can go
>> with that...
>>
>> Topic:topic1 PartitionCount:64 ReplicationFactor:1 Configs:
>>
>> Topic: topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
>>
>> Topic: topic2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
>>
>>
>> 
>>
>> Topic: topic3 Partition: 63 Leader: 2 Replicas: 2 Isr: 2
>>
>>
>> ---
>>
>> Topic:topic2 PartitionCount:12 ReplicationFactor:1 Configs:
>>
>> Topic: topic2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>>
>> Topic: topic2 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> 
>>
>> Topic: topic2 Partition: 11 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> ---
>>
>> Topic:topic3 PartitionCount:64 ReplicationFactor:1 Configs:
>>
>> Topic: topic3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>>
>> Topic: topic3 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> 
>>
>> Topic: topic3 Partition: 63 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> On Tue, Sep 29, 2015 at 8:47 AM, Adrian Tanase  wrote:
>>
>>> The error message is very explicit (partition is under replicated), I
>>> don’t think it’s related to networking issues.
>>>
>>> Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka
>>> —describe topic_name and see which brokers are missing from the replica
>>> assignment.
>>> *(replace home, zk-quorum etc with your own set-up)*
>>>
>>> Lastly, has this ever worked? Maybe you’ve accidentally created the
>>> topic with more partitions and replicas than available brokers… try to
>>> recreate with fewer partitions/replicas, see if it works.
>>>
>>> -adrian
>>>
>>> From: Dmitry Goldenberg
>>> Date: Tuesday, September 29, 2015 at 3:37 PM
>>> To: Adrian Tanase
>>> Cc: "user@spark.apache.org"
>>> Subject: Re: Kafka error "partitions don't have a leader" /
>>> LeaderNotAvailableException
>>>
>>> Adrian,
>>>
>>> Thanks for your response. I just looked at both machines we're testing
>>> on and on both the Kafka server process looks OK. Anything specific I can
>>> check otherwise?
>>>
>>> From googling around, I see some posts where folks suggest 

RE: Converting a DStream to schemaRDD

2015-09-29 Thread Ewan Leith
Something like:

dstream.foreachRDD { rdd =>
  val df =  sqlContext.read.json(rdd)
  df.select(…)
}

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams


Might be the place to start, it’ll convert each batch of dstream into an RDD 
then let you work it as if it were a standard RDD dataset.

Ewan


From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
Sent: 29 September 2015 15:03
To: user 
Subject: Converting a DStream to schemaRDD

Hi,
I have a DStream which is a stream of RDD[String].

How can I pass a DStream to sqlContext.jsonRDD and work with it as a DF ?

Thank you.
Daniel



Fetching Date value from spark.sql.row in Spark 1.2.2

2015-09-29 Thread satish chandra j
HI All,
Currently using Spark 1.2.2, as getDate method is not defined in this
version hence trying to fetch Date value of a specific coulmn using *get*
method as specified in docs (ref URL given below:)

https://spark.apache.org/docs/1.2.2/api/java/index.html?org/apache/spark/sql/api/java/Row.html

But getting an error: "value get is not a member of
org.apache.spark.sql.row"

Regards,
Satish Chandra


Change Orc split size

2015-09-29 Thread Renu Yadav
Hi,

I am reading data from hive orc table using spark-sql  which is taking
256mb as split size.

How can i change this size

Thanks,
Renu


Re: Spark Streaming many subscriptions vs many jobs

2015-09-29 Thread Cody Koeninger
There isn't an easy way of ensuring delivery semantics for producing to
kafka (see
https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish).
If there's only one logical consumer of the intermediate state, I wouldn't
write it back to kafka, i'd just keep it in a single spark job.

If the intermediate state is useful in its own right (multiple consumers),
then sure write it to kafka, just be aware of the possibility of duplicate
messages.

On Tue, Sep 29, 2015 at 6:42 AM, Arttii  wrote:

> Hi,
>
> So I am working on a project where we might end up having a bunch of
> decoupled logic components that have to run inside spark streaming. We are
> using KAFKA as the source of streaming data.
> My first question is; is it better to chain these logics together by
> applying transforms to a single rdd or say transforming and writing back to
> KAFKA and consuming this in another stream and applying more logic. The
> benfit of the second approach is that it is more decoupled.
>
> Another question would be is what the best practice to have one huge spark
> streaming job with a bunch of subscriptions and transform chains? Or should
> I group this into a bunch of jobs with some logical paritioning?
>
> Any idea what the performance drawbacks would be in any case? I know this
> is
> a broadish question, but help would be greatly appreciated.
>
> Arti
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-many-subscriptions-vs-many-jobs-tp24862.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Stopping criteria for gradient descent

2015-09-29 Thread Yanbo Liang
Hi Nishanth,

The diff of solution vectors is compared to relative tolerance or absolute
tolerance, you can set convergenceTol which can affect the convergence
criteria of SGD.

2015-09-17 8:31 GMT+08:00 Nishanth P S :

> Hi,
>
> I am running LogisticRegressionWithSGD in spark 1.4.1 and it always takes
> 100 iterations to train (which is the default). It never meets the
> convergence criteria, shouldn't the convergence criteria for SGD be based
> on difference in logloss or the difference in accuracy on a held out test
> set ?
>
> Code for convergence criteria:
>
> https://github.com/apache/spark/blob/c0e9ff1588b4d9313cc6ec6e00e5c7663eb67910/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L251
>
> Thanks,
> Nishanth
>


PySpark Checkpoints with Broadcast Variables

2015-09-29 Thread Jason White
I'm having trouble loading a streaming job from a checkpoint when a
broadcast variable is defined. I've seen the solution by TD in Scala (
https://issues.apache.org/jira/browse/SPARK-5206) that uses a singleton to
get/create an accumulator, but I can't seem to get it to work in PySpark
with a broadcast variable.

A simplified code snippet:
broadcastHelper = {}

class StreamingJob(object):
def transform_function(self):
def transform_function_inner(t, rdd):
if 'bar' not in broadcastHelper:
broadcastHelper['bar'] =
rdd.context.broadcast(broadcastHelper['foo'])
return rdd.filter(lambda event: event['id'] not in
broadcastHelper['bar'].value)
return transform_function_inner

def createContext(self):
dstream = self.getKafkaStream()
dstream = dstream.transform(self.transform_function())
dstream.foreachRdd(lambda rdd:
rdd.foreachPartition(self.send_partition))

def run(self):
broadcastHelper['foo'] = {1, 2, 3}
ssc = StreamingContext.getOrCreate(self.checkpoint_path,
self.createContext)
ssc.start()
ssc.awaitTermination()

The error I inevitably get when restoring from the checkpoint is:
Exception: (Exception("Broadcast variable '3' not loaded!",), , (3L,))

Has anyone had any luck checkpointing in PySpark with a broadcast variable?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Checkpoints-with-Broadcast-Variables-tp24863.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to find how much data will be train in mllib or how much the spark job is completed ?

2015-09-29 Thread Robineast
This page gives details on the monitoring available
http://spark.apache.org/docs/latest/monitoring.html. You can get a UI
showing Jobs, Stages and Tasks with an indication how far completed the job
is. The UI is usually on port 4040 of the machine where you run the spark
driver program.

The monitoring page also provides details of a REST API for monitoring the
same values



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-how-much-data-will-be-train-in-mllib-or-how-much-the-spark-job-is-completed-tp24858p24859.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming Log4j Inside Eclipse

2015-09-29 Thread Adrian Tanase
You should set exta java options for your app via Eclipse project and specify 
something like

 -Dlog4j.configuration=file:/tmp/log4j.properties

Sent from my iPhone

On 28 Sep 2015, at 18:52, Shixiong Zhu 
> wrote:

You can use JavaSparkContext.setLogLevel to set the log level in your codes.


Best Regards,

Shixiong Zhu

2015-09-28 22:55 GMT+08:00 Ashish Soni 
>:
I am not running it using spark submit , i am running locally inside Eclipse 
IDE , how i set this using JAVA Code

Ashish

On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase 
> wrote:
You also need to provide it as parameter to spark submit
http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver

From: Ashish Soni
Date: Monday, September 28, 2015 at 5:18 PM
To: user
Subject: Spark Streaming Log4j Inside Eclipse

I need to turn off the verbose logging of Spark Streaming Code when i am 
running inside eclipse i tried creating a log4j.properties file and placed 
inside /src/main/resources but i do not see it getting any effect , Please help 
as not sure what else needs to be done to change the log at DEBUG or WARN




Re: Adding / Removing worker nodes for Spark Streaming

2015-09-29 Thread Adrian Tanase
Just wanted to make sure one thing is really clear – the kafka offsets are part 
of the actual RDD – in every batch spark is saving the offset ranges for each 
partition – this in theory will make the data in each batch stable across 
recovery.

The other important thing is that with correct checkpointing on the DStreams 
(mandatory on stateful ones) you will rarely (if ever!) need to go back from 
zero. That’s the point of checkpointing data.
If you checkpoint every 10 batches, then you will have to re-process AT MOST 10 
batches back, and the new data will be merged into the state that’s loaded from 
the hdfs checkpoint.

Lastly, there are still issues with adding/removing nodes from a running 
cluster. Most of the time it works, sometimes the job crashes or doesn’t 
re-deploy the executors. That being said, restarting the driver (with no 
dataloss thanks to checkpointing) has always been a workaround that worked for 
me.
In this spirit, you could test (I have it on my list) stopping a driver by 
killing the process or with yarn application –kill and resubmitting with a 
larger number of executors (—executor-cores). In theory it should work as 
expected, I don’t think this is part of the checkpointed metadata in the spark 
context.

-adrian


From: Cody Koeninger
Date: Tuesday, September 29, 2015 at 12:49 AM
To: Sourabh Chandak
Cc: Augustus Hong, "user@spark.apache.org"
Subject: Re: Adding / Removing worker nodes for Spark Streaming

If a node fails, the partition / offset range that it was working on will be 
scheduled to run on another node.  This is generally true of spark, regardless 
of checkpointing.

The offset ranges for a given batch are stored in the checkpoint for that 
batch.  That's relevant if your entire job fails (driver failure, all workers 
fail, etc).

If you really can't afford to run from the smallest offset and can't afford to 
lose data, don't rely on spark checkpoints (because of the conditions under 
which they can't be recovered).  Store the offset ranges yourself.


On Mon, Sep 28, 2015 at 4:34 PM, Sourabh Chandak 
> wrote:
I also have the same use case as Augustus, and have some basic questions about 
recovery from checkpoint. I have a 10 node Kafka cluster and a 30 node Spark 
cluster running streaming job, how is the (topic, partition) data handled in 
checkpointing. The scenario I want to understand is, in case of node failure 
how will a new node know the checkpoint of the failed node?
The amount of data we have is huge and we can't run from the smallest offset.

Thanks,
Sourabh

On Mon, Sep 28, 2015 at 11:43 AM, Augustus Hong 
> wrote:
Got it, thank you!


On Mon, Sep 28, 2015 at 11:37 AM, Cody Koeninger 
> wrote:
Losing worker nodes without stopping is definitely possible.  I haven't had 
much success adding workers to a running job, but I also haven't spent much 
time on it.

If you're restarting with the same jar, you should be able to recover from 
checkpoint without losing data (usual caveats apply, e.g. you need enough kafka 
retention).  Make sure to test it though, as the code paths taken during 
recovery from checkpoint are not the same as on initial startup, and you can 
run into unexpected issues (e.g. authentication).

On Mon, Sep 28, 2015 at 1:27 PM, Augustus Hong 
> wrote:
Hey all,

I'm evaluating using Spark Streaming with Kafka direct streaming, and I have a 
couple of questions:

1.  Would it be possible to add / remove worker nodes without stopping and 
restarting the spark streaming driver?

2.  I understand that we can enable checkpointing to recover from node 
failures, and that it doesn't work across code changes.  What about in the 
event that worker nodes failed due to load -> we added more worker nodes -> 
restart Spark Streaming?  Would this incur data loss as well?


Best,
Augustus

--
[Branch Metrics mobile deep linking] 
[https://app.xink.io/Images/Get/G3/b84.jpg]  Augustus Hong
 Data Analytics | Branch Metrics
 m 650-391-3369 | e 
augus...@branch.io




--
[Branch Metrics mobile deep linking] 
[https://app.xink.io/Images/Get/G3/b84.jpg]  Augustus Hong
 Data Analytics | Branch Metrics
 m 650-391-3369 | e 
augus...@branch.io




Spark Streaming many subscriptions vs many jobs

2015-09-29 Thread Arttii
Hi,

So I am working on a project where we might end up having a bunch of
decoupled logic components that have to run inside spark streaming. We are
using KAFKA as the source of streaming data.
My first question is; is it better to chain these logics together by
applying transforms to a single rdd or say transforming and writing back to
KAFKA and consuming this in another stream and applying more logic. The
benfit of the second approach is that it is more decoupled.

Another question would be is what the best practice to have one huge spark
streaming job with a bunch of subscriptions and transform chains? Or should
I group this into a bunch of jobs with some logical paritioning? 

Any idea what the performance drawbacks would be in any case? I know this is
a broadish question, but help would be greatly appreciated.

Arti



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-many-subscriptions-vs-many-jobs-tp24862.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does YARN start new executor in place of the failed one?

2015-09-29 Thread Adrian Tanase
In theory, yes  - however in practice it seems that it depends on how they die.

I’ve recently logged an issue for the case when the machine is restarted. If 
the executor process dies it generally comes back gracefully.
https://issues.apache.org/jira/browse/SPARK-10792

Maybe you can vote up the issue if it’s the same use case :)

Also – make sure that you have resources available in YARN, if the cluster is 
shared.

-adrian

From: Alexander Pivovarov
Date: Tuesday, September 29, 2015 at 1:38 AM
To: "user@spark.apache.org"
Subject: Does YARN start new executor in place of the failed one?

Hello Everyone

I use Spark on YARN on EMR-4

The spark program which I run has several jobs/stages and run for about 10 hours
During the execution some executors might fail for some reason.
BUT I do not see that new executor are started in place of the failed ones

So, what I see in spark UI is that at the beginning of my program I have 100 
executors but in 10 hours I see only 67 executors.

I remember that in Standalone mode Spark Worker starts new executor in place of 
failed one automatically.

How to active the same behavior on YARN?

The only non-default YARN setting I use are the following:
yarn.nodemanager.pmem-check-enabled=false
yarn.nodemanager.vmem-check-enabled=false

Thank you
Alex


Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-09-29 Thread Rick Moritz
I specified JavaSerializer in both cases (and attempted to use Kryo in the
shell, but failed due to SPARK-6520), and still get the vastly differing
performance.
Somehow the shell-compiler must impact either serialization or shuffling,
but at a level other than the standard REPL API, since Zeppelin can use
that to create jobs that perform sufficiently close to a spark-job, that I
wouldn't be able to tell the difference (as you would hope to expect).

I use Hive, left-accumulating lists and leftOuterJoins, amid normal
.map/.reduceByKey in my code, if that's somehow dealt with differently in
the spark-shell.

On Mon, Sep 28, 2015 at 8:47 PM, Kartik Mathur  wrote:

> Ok, that might be possible , to confirm that you can explicitly specify
> the serializer in both cases (by setting this spark.serializer i guess).
> So then you can be sure that same serializers are used and may be then do
> an analysis.
>
> Best,
> Kartik
>
> On Mon, Sep 28, 2015 at 11:38 AM, Rick Moritz  wrote:
>
>> Hi Kartik,
>>
>> Thanks for the input!
>>
>> Sadly, that's not it - I'm using YARN - the configuration looks
>> identical, and the nodes/memory/cores are deployed identically and exactly
>> as specified.
>>
>> My current hunch, is that for some reason different serializers are used
>> in each case, but I can find no documentation on why that could be the
>> case, and the configuration isn't indicative of that either.
>> Nonetheless, the symptom of different shuffle volume for same shuffle
>> number of tuples could well point to that as source of my issue.
>> In fact, a colleague pointed out that HIS (Cloudera) installation was
>> defaulting to kryo for the spark-shell, which had an impact for some jobs.
>> I couldn't find the document he was referring to as a source of this
>> information, but the behavior sounds plausible at least.
>>
>> Best,
>>
>> Rick
>>
>>
>> On Mon, Sep 28, 2015 at 8:24 PM, Kartik Mathur 
>> wrote:
>>
>>> Hey Rick ,
>>> Not sure on this but similar situation happened with me, when starting
>>> spark-shell it was starting a new cluster instead of using the existing
>>> cluster and this new cluster was a single node cluster , that's why jobs
>>> were taking forever to complete from spark-shell and were running much
>>> faster using submit (which reads conf correctly) or zeppelin for that
>>> matter.
>>>
>>> Thanks,
>>> Kartik
>>>
>>> On Sun, Sep 27, 2015 at 11:45 PM, Rick Moritz  wrote:
>>>
 I've finally been able to pick this up again, after upgrading to Spark
 1.4.1, because my code used the HiveContext, which runs fine in the REPL
 (be it via Zeppelin or the shell) but won't work with spark-submit.
 With 1.4.1, I hav actually managed to get a result with the Spark
 shell, but after
 3847,802237 seconds and in particular the last stage took 1320,672
 seconds.
 This was after I used coalesce to balance the workload initiall, since
 a Hive filter I applied normally would make for a skewed distribution of
 the data onto the nodes.
 Nonetheless, the same code (even withouth the coalesce) would work much
 faster in Zeppelin (around 1200 seconds with 1.4.0) and as a spark-submit
 job, the run time was just a tenth at
 446,657534 seconds for the entire job and notably 38,961 seconds for
 the final stage.

 Again, there is a huge difference in the amount of data that gets
 shuffled/spilled (which leads to much earlier OOM-conditions), when using
 spark-shell.
 What could be the reason for this different behaviour using very
 similar configurations and identical data, machines and code (identical
 DAGs and sources) and identical spark binaries? Why would code launched
 from spark-shell generate more shuffled data for the same number of
 shuffled tuples?

 An analysis would be much appreciated.

 Best,

 Rick

 On Wed, Aug 19, 2015 at 2:47 PM, Rick Moritz  wrote:

> oops, forgot to reply-all on this thread.
>
> -- Forwarded message --
> From: Rick Moritz 
> Date: Wed, Aug 19, 2015 at 2:46 PM
> Subject: Re: Strange shuffle behaviour difference between Zeppelin and
> Spark-shell
> To: Igor Berman 
>
>
> Those values are not explicitely set, and attempting to read their
> values results in 'java.util.NoSuchElementException:
> spark.shuffle.spill.compress'.
> What I mean by the volume per element being larger is illustrated in
> my original post: for each case the number of elements is identical, but
> the volume of data required to obtain/manage these elements is many times
> greater.
>
> The only difference used to be that Zeppelin had FAIR scheduling over
> FIFO scheduling for spark-shell. I just verified that spark-shell with 
> FAIR
> scheduling makes 

PySpark Checkpoints with Broadcast Variables

2015-09-29 Thread Jason White
I'm having trouble loading a streaming job from a checkpoint when a
broadcast variable is defined. I've seen the solution by TD in Scala (
https://issues.apache.org/jira/browse/SPARK-5206) that uses a singleton to
get/create an accumulator, but I can't seem to get it to work in PySpark
with a broadcast variable.

A simplified code snippet:
broadcastHelper = {}

class StreamingJob(object):
def transform_function(self):
def transform_function_inner(t, rdd):
if 'bar' not in broadcastHelper:
broadcastHelper['bar'] =
rdd.context.broadcast(broadcastHelper['foo'])
return rdd.filter(lambda event: event['id'] not in
broadcastHelper['bar'].value)
return transform_function_inner

def createContext(self):
dstream = self.getKafkaStream()
dstream = dstream.transform(self.transform_function())
dstream.foreachRdd(lambda rdd:
rdd.foreachPartition(self.send_partition))

def run(self):
broadcastHelper['foo'] = {1, 2, 3}
ssc = StreamingContext.getOrCreate(self.checkpoint_path,
self.createContext)
ssc.start()
ssc.awaitTermination()

The error I inevitably get when restoring from the checkpoint is:
Exception: (Exception("Broadcast variable '3' not loaded!",), , (3L,))

Has anyone had any luck checkpointing in PySpark with a broadcast variable?


Re: Spark-Kafka Connector issue

2015-09-29 Thread Cody Koeninger
Show the output of bin/kafka-topics.sh --list.  Show the actual code with
the topic name hardcoded in the set, not loaded from an external file you
didn't show.  Show the full stacktrace you're getting.

On Mon, Sep 28, 2015 at 10:03 PM, Ratika Prasad 
wrote:

> Yes the queues are created and gets listed as well and I have posted few
> Msges also which I am able to read using Kafka-consumer.sh --from-beginning
> how spark fails with No leader offset for Set.
>
> Tried changing the offset.storage to Kafka from zookeeper.
>
> Kindly help
>
> Sent from Outlook 
>
> _
> From: Cody Koeninger 
> Sent: Tuesday, September 29, 2015 12:33 am
> Subject: Re: Spark-Kafka Connector issue
> To: Ratika Prasad 
> Cc: 
>
>
>
> Did you actually create TestTopic?  See if it shows up using
> bin/kafka-topics.sh --list, and if not, create it using bin/kafka-topics.sh
> --create
>
> On Mon, Sep 28, 2015 at 1:20 PM, Ratika Prasad 
> wrote:
>
>> Thanks for your reply.
>>
>>
>>
>> I invoked my program with the broker ip and host and it triggered as
>> expected but I see the below error
>>
>>
>>
>> ./bin/spark-submit --class
>> org.stream.processing.JavaKafkaStreamEventProcessing --master local
>> spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> 172.28.161.32:9092 TestTopic
>>
>> 15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>
>> 15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as
>> local[n], n > 1 in local mode if you have receivers to get data, otherwise
>> Spark jobs will not get resources to process the received data.
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.channels.ClosedChannelException
>>
>> org.apache.spark.SparkException: Couldn't find leader offsets for Set
>> ([TestTopic,0])
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>> at scala.util.Either.fold(Either.scala:97)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>>
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>>
>> at
>> org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:497)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> Whene I ran the below to check the offsets I get this
>>
>>
>>
>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic
>> TestTopic --group test-consumer-group --zookeeper localhost:2181
>>
>> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>> KeeperErrorCode = NoNode for
>> /consumers/test-consumer-group/offsets/TestTopic /0.
>>
>>
>>
>> Also I just added this below configs to my
>> kafaka/config/consumer.properties and restarted kafka
>>
>>
>>
>> auto.offset.reset=smallest
>>
>> offsets.storage=zookeeper
>>
>> offsets.channel.backoff.ms=1000
>>
>> offsets.channel.socket.timeout.ms=1
>>
>> offsets.commit.max.retries=5
>>
>> dual.commit.enabled=true
>>
>>
>>
>> *From:* Cody Koeninger [mailto:c...@koeninger.org]
>> *Sent:* Monday, September 28, 2015 7:56 PM
>> *To:* Ratika Prasad 
>> *Cc:* d...@spark.apache.org
>> *Subject:* Re: Spark-Kafka Connector issue
>>
>>
>>
>> This is a user list question not a dev list question.
>>
>>
>>
>> Looks like your driver is having trouble communicating to the kafka
>> brokers.  Make sure the broker host and port is available from the driver
>> host (using nc or telnet); make sure that you're providing the _broker_
>> host and port to createDirectStream, not the zookeeper host; make sure the

Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON

2015-09-29 Thread Ted Yu
sqlContext.read.json() expects Path to the JSON file.

FYI

On Tue, Sep 29, 2015 at 7:23 AM, Fernando Paladini 
wrote:

> Hello guys,
>
> I'm very new to Spark and I'm having some troubles when reading a JSON to
> dataframe on PySpark.
>
> I'm getting a JSON object from an API response and I would like to store
> it in Spark as a DataFrame (I've read that DataFrame is better than RDD,
> that's accurate?). For what I've read
> 
> on documentation, I just need to call the method sqlContext.read.json in
> order to do what I want.
>
> *Following is the code from my test application:*
> json_object = json.loads(response.text)
> sc = SparkContext("local", appName="JSON to RDD")
> sqlContext = SQLContext(sc)
> dataframe = sqlContext.read.json(json_object)
> dataframe.show()
>
> *The problem is that when I run **"spark-submit myExample.py" I got the
> following error:*
> 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
> manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
> localhost, 48634)
> 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
> Traceback (most recent call last):
>   File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py",
> line 35, in 
> dataframe = sqlContext.read.json(json_object)
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line
> 144, in json
>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36,
> in deco
>   File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 304, in get_return_value
> py4j.protocol.Py4JError: An error occurred while calling o21.json. Trace:
> py4j.Py4JException: Method json([class java.util.HashMap]) does not exist
> at
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
> at
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
> at py4j.Gateway.invoke(Gateway.java:252)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> *What I'm doing wrong? *
> Check out this gist
>  to see the JSON
> I'm trying to load.
>
> Thanks!
>


Re: Spark SQL: Implementing Custom Data Source

2015-09-29 Thread Michael Armbrust
Thats a pretty advanced example that uses experimental APIs.  I'd suggest
looking at https://github.com/databricks/spark-avro as a reference.

On Mon, Sep 28, 2015 at 9:00 PM, Ted Yu  wrote:

> See this thread:
>
> http://search-hadoop.com/m/q3RTttmiYDqGc202
>
> And:
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
>
> On Sep 28, 2015, at 8:22 PM, Jerry Lam  wrote:
>
> Hi spark users and developers,
>
> I'm trying to learn how implement a custom data source for Spark SQL. Is
> there a documentation that I can use as a reference? I'm not sure exactly
> what needs to be extended/implemented. A general workflow will be greatly
> helpful!
>
> Best Regards,
>
> Jerry
>
>


"Method json([class java.util.HashMap]) does not exist" when reading JSON

2015-09-29 Thread Fernando Paladini
Hello guys,

I'm very new to Spark and I'm having some troubles when reading a JSON to
dataframe on PySpark.

I'm getting a JSON object from an API response and I would like to store it
in Spark as a DataFrame (I've read that DataFrame is better than RDD,
that's accurate?). For what I've read

on documentation, I just need to call the method sqlContext.read.json in
order to do what I want.

*Following is the code from my test application:*
json_object = json.loads(response.text)
sc = SparkContext("local", appName="JSON to RDD")
sqlContext = SQLContext(sc)
dataframe = sqlContext.read.json(json_object)
dataframe.show()

*The problem is that when I run **"spark-submit myExample.py" I got the
following error:*
15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
localhost, 48634)
15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
Traceback (most recent call last):
  File "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py", line
35, in 
dataframe = sqlContext.read.json(json_object)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line
144, in json
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36,
in deco
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o21.json. Trace:
py4j.Py4JException: Method json([class java.util.HashMap]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

*What I'm doing wrong? *
Check out this gist 
to see the JSON I'm trying to load.

Thanks!


Spark Job/Stage names

2015-09-29 Thread Nithin Asokan
I'm interested to see if anyone knows of a way to have custom job/stage
name for Spark Application.

I believe I can use *sparkContext.setCallSite(String)* to update job/stage
names but it does not let me update each stage name, setting this value
will set same text for all job and stage names for that application.

Anyone has done something like this before or have thoughts about have
custom stage names?

On a side note, what is the benefit of having a friendly name for RDD (
RDD.name() )

Thanks,
Nithin


Re: Spark Streaming Log4j Inside Eclipse

2015-09-29 Thread Shixiong Zhu
I mean JavaSparkContext.setLogLevel. You can use it like this:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));
jssc.sparkContext().setLogLevel(...);



Best Regards,
Shixiong Zhu

2015-09-29 22:07 GMT+08:00 Ashish Soni :

> I am using Java Streaming context and it doesnt have method setLogLevel
> and also i have tried by passing VM argument in eclipse and it doesnt work
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(2));
>
> Ashish
>
> On Tue, Sep 29, 2015 at 7:23 AM, Adrian Tanase  wrote:
>
>> You should set exta java options for your app via Eclipse project and
>> specify something like
>>
>>  -Dlog4j.configuration=file:/tmp/log4j.properties
>>
>> Sent from my iPhone
>>
>> On 28 Sep 2015, at 18:52, Shixiong Zhu  wrote:
>>
>> You can use JavaSparkContext.setLogLevel to set the log level in your
>> codes.
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-09-28 22:55 GMT+08:00 Ashish Soni :
>>
>>> I am not running it using spark submit , i am running locally inside
>>> Eclipse IDE , how i set this using JAVA Code
>>>
>>> Ashish
>>>
>>> On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase 
>>> wrote:
>>>
 You also need to provide it as parameter to spark submit

 http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver

 From: Ashish Soni
 Date: Monday, September 28, 2015 at 5:18 PM
 To: user
 Subject: Spark Streaming Log4j Inside Eclipse

 I need to turn off the verbose logging of Spark Streaming Code when i
 am running inside eclipse i tried creating a log4j.properties file and
 placed inside /src/main/resources but i do not see it getting any effect ,
 Please help as not sure what else needs to be done to change the log at
 DEBUG or WARN

>>>
>>>
>>
>


Re: Executor Lost Failure

2015-09-29 Thread Nithin Asokan
Try increasing memory (--conf spark.executor.memory=3g or
--executor-memory) for executors. Here is something I noted from your logs

15/09/29 06:32:03 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block rdd_2_1813 in memory.
15/09/29 06:32:03 WARN MemoryStore: Not enough space to cache
rdd_2_1813 in memory!
(computed 840.0 B so far)

On Tue, Sep 29, 2015 at 11:02 AM Anup Sawant 
wrote:

> Hi all,
> Any idea why I am getting 'Executor heartbeat timed out' ? I am fairly new
> to Spark so I have less knowledge about the internals of it. The job was
> running for a day or so on 102 Gb of data with 40 workers.
> -Best,
> Anup.
>
> 15/09/29 06:32:03 ERROR TaskSchedulerImpl: Lost executor driver on
> localhost: Executor heartbeat timed out after 395987 ms
> 15/09/29 06:32:03 WARN MemoryStore: Failed to reserve initial memory
> threshold of 1024.0 KB for computing block rdd_2_1813 in memory.
> 15/09/29 06:32:03 WARN MemoryStore: Not enough space to cache rdd_2_1813
> in memory! (computed 840.0 B so far)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1782.0 in stage 2713.0
> (TID 9101184, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 ERROR TaskSetManager: Task 1782 in stage 2713.0 failed 1
> times; aborting job
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1791.0 in stage 2713.0
> (TID 9101193, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1800.0 in stage 2713.0
> (TID 9101202, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1764.0 in stage 2713.0
> (TID 9101166, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1773.0 in stage 2713.0
> (TID 9101175, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1809.0 in stage 2713.0
> (TID 9101211, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1794.0 in stage 2713.0
> (TID 9101196, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1740.0 in stage 2713.0
> (TID 9101142, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1803.0 in stage 2713.0
> (TID 9101205, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1812.0 in stage 2713.0
> (TID 9101214, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1785.0 in stage 2713.0
> (TID 9101187, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1767.0 in stage 2713.0
> (TID 9101169, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1776.0 in stage 2713.0
> (TID 9101178, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1797.0 in stage 2713.0
> (TID 9101199, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1779.0 in stage 2713.0
> (TID 9101181, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1806.0 in stage 2713.0
> (TID 9101208, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1788.0 in stage 2713.0
> (TID 9101190, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1761.0 in stage 2713.0
> (TID 9101163, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1755.0 in stage 2713.0
> (TID 9101157, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1796.0 in stage 2713.0
> (TID 9101198, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1778.0 in stage 2713.0
> (TID 9101180, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1787.0 in stage 2713.0
> (TID 9101189, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1805.0 in stage 2713.0
> (TID 9101207, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1790.0 in stage 2713.0
> (TID 9101192, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1781.0 in stage 2713.0
> (TID 9101183, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1808.0 in stage 2713.0
> (TID 9101210, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost 

Re: Executor Lost Failure

2015-09-29 Thread Ted Yu
Can you list the spark-submit command line you used ?

Thanks

On Tue, Sep 29, 2015 at 9:02 AM, Anup Sawant 
wrote:

> Hi all,
> Any idea why I am getting 'Executor heartbeat timed out' ? I am fairly new
> to Spark so I have less knowledge about the internals of it. The job was
> running for a day or so on 102 Gb of data with 40 workers.
> -Best,
> Anup.
>
> 15/09/29 06:32:03 ERROR TaskSchedulerImpl: Lost executor driver on
> localhost: Executor heartbeat timed out after 395987 ms
> 15/09/29 06:32:03 WARN MemoryStore: Failed to reserve initial memory
> threshold of 1024.0 KB for computing block rdd_2_1813 in memory.
> 15/09/29 06:32:03 WARN MemoryStore: Not enough space to cache rdd_2_1813
> in memory! (computed 840.0 B so far)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1782.0 in stage 2713.0
> (TID 9101184, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 ERROR TaskSetManager: Task 1782 in stage 2713.0 failed 1
> times; aborting job
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1791.0 in stage 2713.0
> (TID 9101193, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1800.0 in stage 2713.0
> (TID 9101202, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1764.0 in stage 2713.0
> (TID 9101166, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1773.0 in stage 2713.0
> (TID 9101175, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1809.0 in stage 2713.0
> (TID 9101211, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1794.0 in stage 2713.0
> (TID 9101196, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1740.0 in stage 2713.0
> (TID 9101142, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1803.0 in stage 2713.0
> (TID 9101205, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1812.0 in stage 2713.0
> (TID 9101214, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1785.0 in stage 2713.0
> (TID 9101187, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1767.0 in stage 2713.0
> (TID 9101169, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1776.0 in stage 2713.0
> (TID 9101178, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1797.0 in stage 2713.0
> (TID 9101199, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1779.0 in stage 2713.0
> (TID 9101181, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1806.0 in stage 2713.0
> (TID 9101208, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1788.0 in stage 2713.0
> (TID 9101190, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1761.0 in stage 2713.0
> (TID 9101163, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1755.0 in stage 2713.0
> (TID 9101157, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1796.0 in stage 2713.0
> (TID 9101198, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1778.0 in stage 2713.0
> (TID 9101180, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1787.0 in stage 2713.0
> (TID 9101189, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1805.0 in stage 2713.0
> (TID 9101207, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1790.0 in stage 2713.0
> (TID 9101192, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1781.0 in stage 2713.0
> (TID 9101183, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1808.0 in stage 2713.0
> (TID 9101210, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1799.0 in stage 2713.0
> (TID 9101201, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1772.0 in stage 2713.0
> (TID 9101174, localhost): ExecutorLostFailure (executor driver lost)
> 15/09/29 06:32:03 WARN TaskSetManager: Lost task 1763.0 in stage 2713.0

Re: SparkContext._active_spark_context returns None

2015-09-29 Thread Ted Yu
bq. the right way to reach JVM in python

Can you tell us more about what you want to achieve ?

If you want to pass some value to workers, you can use broadcast variable.

Cheers

On Mon, Sep 28, 2015 at 10:31 PM, YiZhi Liu  wrote:

> Hi Ted,
>
> Thank you for reply. The sc works at driver, but how can I reach the
> JVM in rdd.map ?
>
> 2015-09-29 11:26 GMT+08:00 Ted Yu :
>  sc._jvm.java.lang.Integer.valueOf("12")
> > 12
> >
> > FYI
> >
> > On Mon, Sep 28, 2015 at 8:08 PM, YiZhi Liu  wrote:
> >>
> >> Hi,
> >>
> >> I'm doing some data processing on pyspark, but I failed to reach JVM
> >> in workers. Here is what I did:
> >>
> >> $ bin/pyspark
> >> >>> data = sc.parallelize(["123", "234"])
> >> >>> numbers = data.map(lambda s:
> >> >>>
> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf(s.strip()))
> >> >>> numbers.collect()
> >>
> >> I got,
> >>
> >> Caused by: org.apache.spark.api.python.PythonException: Traceback
> >> (most recent call last):
> >>   File
> >>
> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
> >> line 111, in main
> >> process()
> >>   File
> >>
> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
> >> line 106, in process
> >> serializer.dump_stream(func(split_index, iterator), outfile)
> >>   File
> >>
> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/serializers.py",
> >> line 263, in dump_stream
> >> vs = list(itertools.islice(iterator, batch))
> >>   File "", line 1, in 
> >> AttributeError: 'NoneType' object has no attribute '_jvm'
> >>
> >> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
> >> at
> >>
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179)
> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> >> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> >> at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >> ... 1 more
> >>
> >> While _jvm at the driver end looks fine:
> >>
> >> >>>
> >> >>>
> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf("123".strip())
> >> 123
> >>
> >> The program is trivial, I just wonder what is the right way to reach
> >> JVM in python. Any help would be appreciated.
> >>
> >> Thanks
> >>
> >> --
> >> Yizhi Liu
> >> Senior Software Engineer / Data Mining
> >> www.mvad.com, Shanghai, China
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
>
>
> --
> Yizhi Liu
> Senior Software Engineer / Data Mining
> www.mvad.com, Shanghai, China
>


DStream union with different slideDuration

2015-09-29 Thread Goodall, Mark (UK)
Hi, I was wondering if there is a reason for limiting union to only work on 
streams with the same slideDuration.
Looking at UnionDStream.scala, if slideDuration was set to the minimum of the 
parents, and there was a require to enforce that all slideDuration were 
divisible wholly by the minimum, the exception could be removed from the 
compute method.

Is this correct or am I missing something?

This email and any attachments are confidential to the intended
recipient and may also be privileged. If you are not the intended
recipient please delete it from your system and notify the sender.
You should not copy it or use it for any purpose nor disclose or
distribute its contents to any other person.



Spark Job/Stage names

2015-09-29 Thread nasokan
I'm interested to see if anyone knows of a way to have custom job/stage name
for Spark Application. 

I believe I can use sparkContext.setCallSite(String) to update job/stage
names but it does not let me update each stage name, setting this value will
set same text for all job and stage names for that application. 

Anyone has done something like this before or have thoughts about have
custom stage names?

On a side note, what is the benefit of having a friendly name for RDD (
RDD.name() )

Thanks,
Nithin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-Stage-names-tp24867.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
We've got Kafka working generally. Can definitely write to it now.

There was a snag where num.partitions was set to 12 on one node but to 64
on the other.  We fixed this and set num.partitions to 42 and things are
working on that side.






On Tue, Sep 29, 2015 at 9:39 AM, Cody Koeninger  wrote:

> Try writing and reading to the topics in question using the kafka command
> line tools, to eliminate your code as a variable.
>
>
> That number of partitions is probably more than sufficient:
>
>
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> Obviously if you ask for more replicas than you have brokers you're going
> to have a problem, but that doesn't seem to be the case.
>
>
>
> Also, depending on what version of kafka you're using on the broker, you
> may want to look through the kafka jira, e.g.
>
> https://issues.apache.org/jira/browse/KAFKA-899
>
>
> On Tue, Sep 29, 2015 at 8:05 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> "more partitions and replicas than available brokers" -- what would be a
>> good ratio?
>>
>> We've been trying to set up 3 topics with 64 partitions.  I'm including
>> the output of "bin/kafka-topics.sh --zookeeper localhost:2181 --describe
>> topic1" below.
>>
>> I think it's symptomatic and confirms your theory, Adrian, that we've got
>> too many partitions. In fact, for topic 2, only 12 partitions appear to
>> have been created despite the requested 64.  Does Kafka have the limit of
>> 140 partitions total within a cluster?
>>
>> The doc doesn't appear to have any prescriptions as to how you go about
>> calculating an optimal number of partitions.
>>
>> We'll definitely try with fewer, I'm just looking for a good formula to
>> calculate how many. And no, Adrian, this hasn't worked yet, so we'll start
>> with something like 12 partitions.  It'd be good to know how high we can go
>> with that...
>>
>> Topic:topic1 PartitionCount:64 ReplicationFactor:1 Configs:
>>
>> Topic: topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
>>
>> Topic: topic2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
>>
>>
>> 
>>
>> Topic: topic3 Partition: 63 Leader: 2 Replicas: 2 Isr: 2
>>
>>
>> ---
>>
>> Topic:topic2 PartitionCount:12 ReplicationFactor:1 Configs:
>>
>> Topic: topic2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>>
>> Topic: topic2 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> 
>>
>> Topic: topic2 Partition: 11 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> ---
>>
>> Topic:topic3 PartitionCount:64 ReplicationFactor:1 Configs:
>>
>> Topic: topic3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>>
>> Topic: topic3 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> 
>>
>> Topic: topic3 Partition: 63 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> On Tue, Sep 29, 2015 at 8:47 AM, Adrian Tanase  wrote:
>>
>>> The error message is very explicit (partition is under replicated), I
>>> don’t think it’s related to networking issues.
>>>
>>> Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka
>>> —describe topic_name and see which brokers are missing from the replica
>>> assignment.
>>> *(replace home, zk-quorum etc with your own set-up)*
>>>
>>> Lastly, has this ever worked? Maybe you’ve accidentally created the
>>> topic with more partitions and replicas than available brokers… try to
>>> recreate with fewer partitions/replicas, see if it works.
>>>
>>> -adrian
>>>
>>> From: Dmitry Goldenberg
>>> Date: Tuesday, September 29, 2015 at 3:37 PM
>>> To: Adrian Tanase
>>> Cc: "user@spark.apache.org"
>>> Subject: Re: Kafka error "partitions don't have a leader" /
>>> LeaderNotAvailableException
>>>
>>> Adrian,
>>>
>>> Thanks for your response. I just looked at both machines we're testing
>>> on and on both the Kafka server process looks OK. Anything specific I can
>>> check otherwise?
>>>
>>> From googling around, I see some posts where folks suggest to check the
>>> DNS settings (those appear fine) and to set the advertised.host.name in
>>> Kafka's server.properties. Yay/nay?
>>>
>>> Thanks again.
>>>
>>> On Tue, Sep 29, 2015 at 8:31 AM, Adrian Tanase 
>>> wrote:
>>>
 I believe some of the brokers in your cluster died and there are a
 number of partitions that nobody is currently managing.

 -adrian

 From: Dmitry Goldenberg
 Date: Tuesday, September 29, 2015 at 3:26 PM
 To: "user@spark.apache.org"
 Subject: Kafka error "partitions don't have a leader" /
 

Executor Lost Failure

2015-09-29 Thread Anup Sawant
Hi all,
Any idea why I am getting 'Executor heartbeat timed out' ? I am fairly new
to Spark so I have less knowledge about the internals of it. The job was
running for a day or so on 102 Gb of data with 40 workers.
-Best,
Anup.

15/09/29 06:32:03 ERROR TaskSchedulerImpl: Lost executor driver on
localhost: Executor heartbeat timed out after 395987 ms
15/09/29 06:32:03 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block rdd_2_1813 in memory.
15/09/29 06:32:03 WARN MemoryStore: Not enough space to cache rdd_2_1813 in
memory! (computed 840.0 B so far)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1782.0 in stage 2713.0
(TID 9101184, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 ERROR TaskSetManager: Task 1782 in stage 2713.0 failed 1
times; aborting job
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1791.0 in stage 2713.0
(TID 9101193, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1800.0 in stage 2713.0
(TID 9101202, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1764.0 in stage 2713.0
(TID 9101166, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1773.0 in stage 2713.0
(TID 9101175, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1809.0 in stage 2713.0
(TID 9101211, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1794.0 in stage 2713.0
(TID 9101196, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1740.0 in stage 2713.0
(TID 9101142, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1803.0 in stage 2713.0
(TID 9101205, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1812.0 in stage 2713.0
(TID 9101214, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1785.0 in stage 2713.0
(TID 9101187, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1767.0 in stage 2713.0
(TID 9101169, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1776.0 in stage 2713.0
(TID 9101178, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1797.0 in stage 2713.0
(TID 9101199, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1779.0 in stage 2713.0
(TID 9101181, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1806.0 in stage 2713.0
(TID 9101208, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1788.0 in stage 2713.0
(TID 9101190, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1761.0 in stage 2713.0
(TID 9101163, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1755.0 in stage 2713.0
(TID 9101157, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1796.0 in stage 2713.0
(TID 9101198, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1778.0 in stage 2713.0
(TID 9101180, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1787.0 in stage 2713.0
(TID 9101189, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1805.0 in stage 2713.0
(TID 9101207, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1790.0 in stage 2713.0
(TID 9101192, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1781.0 in stage 2713.0
(TID 9101183, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1808.0 in stage 2713.0
(TID 9101210, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1799.0 in stage 2713.0
(TID 9101201, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1772.0 in stage 2713.0
(TID 9101174, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1763.0 in stage 2713.0
(TID 9101165, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1802.0 in stage 2713.0
(TID 9101204, localhost): ExecutorLostFailure (executor driver lost)
15/09/29 06:32:03 WARN TaskSetManager: Lost task 1748.0 in stage 2713.0
(TID 

Re: Does pyspark in cluster mode need python on individual executor nodes ?

2015-09-29 Thread Ted Yu
I think the answer is yes.

Code packaged in pyspark.zip needs python to execute.

On Tue, Sep 29, 2015 at 2:08 PM, Ranjana Rajendran <
ranjana.rajend...@gmail.com> wrote:

> Hi,
>
> Does a python spark program (which makes use of pyspark ) submitted in
> cluster mode need python on the executor nodes ?  Isn't the python program
> interpreted on the client node from where the job is submitted and then the
> executors run in the JVM of each the executor nodes ?
>
> Thank you,
> Ranjana
>


using UDF( defined in Java) in scala through scala

2015-09-29 Thread ogoh
Hello,
I have a udf declared in Java but I'd like to call it from spark-shell which
only supports Scala.
Since I am new to Scala, I couldn't figure out how to call register the Java
UDF using sqlContext.udf.register in scala.
Below is how I tried.
I appreciate any help.
Thanks,

= my UDF in java
public class ArrayStringOfJson implements UDF1 {
public ArrayType call(String input) throws Exception {
..
  }
}
= using in spark-shell
scala> import org.apache.spark.sql.api.java.UDF1
scala> import com.mycom.event.udfs.ArrayStringOfJson
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> val instance: ArrayStringOfJson = new ArrayStringOfJson
scala> sqlContext.udf.register("arraystring", instance,
org.apache.spark.sql.types.ArrayType)
:28: error: overloaded method value register with alternatives:
  (name: String,f: org.apache.spark.sql.api.java.UDF22[_, _, _, _, _, _, _,
_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType:
org.apache.spark.sql.types.DataType)Unit 
  ...
  (name: String,f: org.apache.spark.sql.api.java.UDF1[_, _],returnType:
org.apache.spark.sql.types.DataType)Unit
 cannot be applied to (String, com.mycom.event.udfs.ArrayStringOfJson,
org.apache.spark.sql.types.ArrayType.type)
  sqlContext.udf.register("arraystring", instance,
org.apache.spark.sql.types.ArrayType)
 ^





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-UDF-defined-in-Java-in-scala-through-scala-tp24880.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Dmitry Goldenberg
Release of Spark: 1.5.0.

Command line invokation:

ACME_INGEST_HOME=/mnt/acme/acme-ingest
ACME_INGEST_VERSION=0.0.1-SNAPSHOT
ACME_BATCH_DURATION_MILLIS=5000
SPARK_MASTER_URL=spark://data1:7077
JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"

$SPARK_HOME/bin/spark-submit \
--driver-class-path  $ACME_INGEST_HOME \
--driver-java-options "$JAVA_OPTIONS" \
--class "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver" \
--master $SPARK_MASTER_URL  \
--conf
"spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
\

$ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
-brokerlist $METADATA_BROKER_LIST \
-topic acme.topic1 \
-autooffsetreset largest \
-batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
-appname Acme.App1 \
-checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
Note that SolrException is definitely in our consumer jar
acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
$ACME_INGEST_HOME.

For the extraClassPath on the executors, we've got additionally
hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
Spark jobs to communicate with HBase.  The only way to force Phoenix to
successfully communicate with HBase was to have that JAR explicitly added
to the executor classpath regardless of the fact that the contents of the
hbase-protocol hadoop jar get rolled up into the consumer jar at build time.

I'm starting to wonder whether there's some class loading pattern here
where some classes may not get loaded out of the consumer jar and therefore
have to have their respective jars added to the executor extraClassPath?

Or is this a serialization problem for SolrException as Divya Ravichandran
suggested?




On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:

> Mind providing a bit more information:
>
> release of Spark
> command line for running Spark job
>
> Cheers
>
> On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> We're seeing this occasionally. Granted, this was caused by a wrinkle in
>> the Solr schema but this bubbled up all the way in Spark and caused job
>> failures.
>>
>> I just checked and SolrException class is actually in the consumer job
>> jar we use.  Is there any reason why Spark cannot find the SolrException
>> class?
>>
>> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
>> could not be deserialized
>> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>> at
>> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>> at
>> 

Re: Hive ORC Malformed while loading into spark data frame

2015-09-29 Thread Hortonworks
You can try to use data frame for both read and write

Thanks

Zhan Zhang


Sent from my iPhone

> On Sep 29, 2015, at 1:56 PM, Umesh Kacha  wrote:
> 
> Hi Zang, thanks for the response. Table is created using Spark 
> hiveContext.sql and data inserted into table also using hiveContext.sql. 
> Insert into partition table. When I try to load orc data into dataframe I am 
> loading particular partition data stored in path say 
> /user/xyz/Hive/xyz.db/sparktable/partition1=abc
> 
> Regards, 
> Umesh
> 
>> On Sep 30, 2015 02:21, "Hortonworks"  wrote:
>> How was the table is generated, by hive or by spark?
>> 
>> If you generate table using have but read it by data frame, it may have some 
>> comparability issue.
>> 
>> Thanks
>> 
>> Zhan Zhang
>> 
>> 
>> Sent from my iPhone
>> 
>> > On Sep 29, 2015, at 1:47 PM, unk1102  wrote:
>> >
>> > Hi I have a spark job which creates hive tables in orc format with
>> > partitions. It works well I can read data back into hive table using hive
>> > console. But if I try further process orc files generated by Spark job by
>> > loading into dataframe  then I get the following exception
>> > Caused by: java.io.IOException: Malformed ORC file
>> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt. Invalid
>> > postscript.
>> >
>> > Dataframe df = hiveContext.read().format("orc").load(to/path);
>> >
>> > Please guide.
>> >
>> >
>> >
>> > --
>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>> >
>> 
>> --
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader
>> of this message is not the intended recipient, you are hereby notified that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender immediately
>> and delete it from your system. Thank You.

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Ted Yu
Mind providing a bit more information:

release of Spark
command line for running Spark job

Cheers

On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg  wrote:

> We're seeing this occasionally. Granted, this was caused by a wrinkle in
> the Solr schema but this bubbled up all the way in Spark and caused job
> failures.
>
> I just checked and SolrException class is actually in the consumer job jar
> we use.  Is there any reason why Spark cannot find the SolrException class?
>
> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception could
> not be deserialized
> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>


unintended consequence of using coalesce operation

2015-09-29 Thread Lan Jiang
Hi, there

I ran into an issue when using Spark (v 1.3) to load avro file through Spark 
SQL. The code sample is below

val df = sqlContext.load(“path-to-avro-file","com.databricks.spark.avro”)
val myrdd = df.select(“Key", “Name", “binaryfield").rdd
val results = myrdd.map(...)
val finalResults = results.filter(...)
finalResults.coalesce(1).toDF().saveAsParquetFile(“path-to-parquet”)  

The avro file 645M. The HDFS block size is 128M. Thus the total is 5 HDFS 
blocks, which means there should be 5 partitions. Please note that I use 
coalesce because I expect the previous filter transformation should filter out 
almost all the data and I would like to write to 1 single parquet file. 

YARN cluster has 3 datanodes. I use the below configuration for spark submit

spark-submit —class  —num-executors 3 —executor-cores 2 
—executor-memory 8g —master yarn-client mytest.jar 

I do see 3 executors being created, one on each data/worker node. However, 
there is only one task running within the cluster.  After I remove the 
coalesce(1) call from the codes, I can see 5 tasks generates, spreading across 
3 executors.  I was surprised by the result. coalesce usually is thought to be 
a better choice than repartition operation when reducing the partition numbers. 
However, in the case, it causes performance issue because Spark only creates 
one task because the final partition number was coalesced to 1.  Thus there is 
only one thread reading HDFS files instead of 5. 

Is my understanding correct? In this case, I think repartition is a better 
choice than coalesce. 

Lan





Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Ted Yu
Have you tried the following ?
--conf spark.driver.userClassPathFirst=true --conf spark.executor.
userClassPathFirst=true

On Tue, Sep 29, 2015 at 4:38 PM, Dmitry Goldenberg  wrote:

> Release of Spark: 1.5.0.
>
> Command line invokation:
>
> ACME_INGEST_HOME=/mnt/acme/acme-ingest
> ACME_INGEST_VERSION=0.0.1-SNAPSHOT
> ACME_BATCH_DURATION_MILLIS=5000
> SPARK_MASTER_URL=spark://data1:7077
> JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
> JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"
>
> $SPARK_HOME/bin/spark-submit \
> --driver-class-path  $ACME_INGEST_HOME \
> --driver-java-options "$JAVA_OPTIONS" \
> --class "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver" \
> --master $SPARK_MASTER_URL  \
> --conf
> "spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
> \
>
> $ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
> -brokerlist $METADATA_BROKER_LIST \
> -topic acme.topic1 \
> -autooffsetreset largest \
> -batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
> -appname Acme.App1 \
> -checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
> Note that SolrException is definitely in our consumer jar
> acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
> $ACME_INGEST_HOME.
>
> For the extraClassPath on the executors, we've got additionally
> hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
> Spark jobs to communicate with HBase.  The only way to force Phoenix to
> successfully communicate with HBase was to have that JAR explicitly added
> to the executor classpath regardless of the fact that the contents of the
> hbase-protocol hadoop jar get rolled up into the consumer jar at build time.
>
> I'm starting to wonder whether there's some class loading pattern here
> where some classes may not get loaded out of the consumer jar and therefore
> have to have their respective jars added to the executor extraClassPath?
>
> Or is this a serialization problem for SolrException as Divya
> Ravichandran suggested?
>
>
>
>
> On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:
>
>> Mind providing a bit more information:
>>
>> release of Spark
>> command line for running Spark job
>>
>> Cheers
>>
>> On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> We're seeing this occasionally. Granted, this was caused by a wrinkle in
>>> the Solr schema but this bubbled up all the way in Spark and caused job
>>> failures.
>>>
>>> I just checked and SolrException class is actually in the consumer job
>>> jar we use.  Is there any reason why Spark cannot find the SolrException
>>> class?
>>>
>>> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
>>> could not be deserialized
>>> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>>> at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>> at
>>> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>>> 

ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Dmitry Goldenberg
We're seeing this occasionally. Granted, this was caused by a wrinkle in
the Solr schema but this bubbled up all the way in Spark and caused job
failures.

I just checked and SolrException class is actually in the consumer job jar
we use.  Is there any reason why Spark cannot find the SolrException class?

15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception could
not be deserialized
java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Hive ORC Malformed while loading into spark data frame

2015-09-29 Thread unk1102
Hi I have a spark job which creates hive tables in orc format with
partitions. It works well I can read data back into hive table using hive
console. But if I try further process orc files generated by Spark job by
loading into dataframe  then I get the following exception 
Caused by: java.io.IOException: Malformed ORC file
hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt. Invalid
postscript.

Dataframe df = hiveContext.read().format("orc").load(to/path);  

Please guide. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Divya Ravichandran
This could be because org.apache.solr.common.SolrException doesn't
implement Serializable.

This error shows up when Spark is deserilizing a class which doesn't
implement Serializable.

Thanks
Divya
On Sep 29, 2015 4:37 PM, "Dmitry Goldenberg" 
wrote:

> We're seeing this occasionally. Granted, this was caused by a wrinkle in
> the Solr schema but this bubbled up all the way in Spark and caused job
> failures.
>
> I just checked and SolrException class is actually in the consumer job jar
> we use.  Is there any reason why Spark cannot find the SolrException class?
>
> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception could
> not be deserialized
> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>


Re: Hive ORC Malformed while loading into spark data frame

2015-09-29 Thread Umesh Kacha
Hi Zang, thanks for the response. Table is created using Spark
hiveContext.sql and data inserted into table also using hiveContext.sql.
Insert into partition table. When I try to load orc data into dataframe I
am loading particular partition data stored in path say
/user/xyz/Hive/xyz.db/sparktable/partition1=abc

Regards,
Umesh
On Sep 30, 2015 02:21, "Hortonworks"  wrote:

> How was the table is generated, by hive or by spark?
>
> If you generate table using have but read it by data frame, it may have
> some comparability issue.
>
> Thanks
>
> Zhan Zhang
>
>
> Sent from my iPhone
>
> > On Sep 29, 2015, at 1:47 PM, unk1102  wrote:
> >
> > Hi I have a spark job which creates hive tables in orc format with
> > partitions. It works well I can read data back into hive table using hive
> > console. But if I try further process orc files generated by Spark job by
> > loading into dataframe  then I get the following exception
> > Caused by: java.io.IOException: Malformed ORC file
> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt. Invalid
> > postscript.
> >
> > Dataframe df = hiveContext.read().format("orc").load(to/path);
> >
> > Please guide.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
>
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>


Does pyspark in cluster mode need python on individual executor nodes ?

2015-09-29 Thread Ranjana Rajendran
Hi,

Does a python spark program (which makes use of pyspark ) submitted in
cluster mode need python on the executor nodes ?  Isn't the python program
interpreted on the client node from where the job is submitted and then the
executors run in the JVM of each the executor nodes ?

Thank you,
Ranjana


Self Join reading the HDFS blocks TWICE

2015-09-29 Thread Data Science Education
As part of fairly complex processing, I am executing a self join query
using HiveContext against a Hive table to find the latest Transaction,
oldest Transaction etc: for a given set of Attributes. I am still using
v1.3.1 and so Window functions are not an option. The simplified query
looks like below.

val df = hiveContext.sql("""SELECT TAB1.KEY1 ,TAB1.KEY2

,MAX(CASE WHEN (TAB1.FLD10 = TAB2.min_FLD10) THEN TAB1.FLD11

ELSE -999 END) AS NEW_FLD

FROM TAB1

INNER JOIN

( SELECT KEY1 ,KEY2 ,

MIN(FLD10) AS min_FLD10

FROM TAB1

WHERE partition_key >= '2015-01-01' and partition_key < '2015-07-01'

GROUP BY KEY1 ,KEY2 ) TAB2

ON TAB1.KEY1= TAB2.KEY1AND TAB1.KEY2= TAB2.KEY1

WHERE partition_key >= '2015-01-01' and partition_key < '2015-07-01'

GROUP BY TAB1.KEY1, TAB1.KEY2""")

I see that ~18,000 HDFS blocks are read TWICE and then the Shuffle happens
. Is there a way to avoid reading the same blocks TWICE during the Map
Stage? Is there a way to try to avoid Shuffle? Thank You.


Spark Streaming Standalone 1.5 - Stage cancelled because SparkContext was shut down

2015-09-29 Thread An Tran
Hello All,

I have several Spark Streaming applications running on Standalone mode in
Spark 1.5.  Spark is currently set up for dynamic resource allocation.  The
issue I am seeing is that I can have about 12 Spark Streaming Jobs running
concurrently.  Occasionally I would see more than half where to fail due
to Stage cancelled because SparkContext was shut down.  It would
automatically restart as it runs on supervised mode.  Attached is the
screenshot of one of the jobs that failed.  Anyone have any insight as to
what is going on?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Does pyspark in cluster mode need python on individual executor nodes ?

2015-09-29 Thread Ranjana Rajendran
Thank you Ted.

I have Python 2.6 on all the nodes including the client node.  I want to
instead use Python 2.7. For the PySpark shell, I was able to do this by
downloading python 2.7.8 and installing it in a root based out of my home
directory and setting PYSPARK_PYTHON to ~/python2.7/bin/python and then
invoking the pyspark shell.

For cluster mode, I guess I would have to do the same for all the executor
nodes and set the PYSPARK_PYTHON path on all those nodes.

I did submit a pyspark program in cluster node, but not sure if it is using
python 2.6 on all the executor nodes.

On Tue, Sep 29, 2015 at 2:17 PM, Ted Yu  wrote:

> I think the answer is yes.
>
> Code packaged in pyspark.zip needs python to execute.
>
> On Tue, Sep 29, 2015 at 2:08 PM, Ranjana Rajendran <
> ranjana.rajend...@gmail.com> wrote:
>
>> Hi,
>>
>> Does a python spark program (which makes use of pyspark ) submitted in
>> cluster mode need python on the executor nodes ?  Isn't the python program
>> interpreted on the client node from where the job is submitted and then the
>> executors run in the JVM of each the executor nodes ?
>>
>> Thank you,
>> Ranjana
>>
>
>


Fwd: Query about checkpointing time

2015-09-29 Thread Jatin Ganhotra
Hi,

I started doing the amp-camp 5 exercises
.
I tried the following 2 scenarios:

*Scenario #1*
val pagecounts = sc.textFile("data/pagecounts")
pagecounts.checkpoint
pagecounts.count

*Scenario #2*
val pagecounts = sc.textFile("data/pagecounts")
pagecounts.checkpoint

The total time show in the Spark shell Application UI was different for
both scenarios. Scenario #1 took 0.5 seconds, while scenario #2 took only
0.2 s.

*Questions:*
1. In scenario #1, checkpoint command does nothing, it's neither a
transformation nor an action. It's saying that once the RDD materializes
after an action, go ahead and save to disk. Am I missing something here?

2. I understand that scenario #1 is taking more time, because the RDD is
check-pointed (written to disk). Is there a way I can know the amount taken
to checkpoint, from the total time?
The Spark shell Application UI shows the following - Scheduler delay, Task
Deserialization time, GC time, Result serialization time, getting result
time. But, doesn't show the breakdown for checkpointing.

3. Is there a way to access the above metrics e.g. scheduler delay, GC time
and save them programmatically? I want to log some of the above metrics for
every action invoked on an RDD.

Please let me know if you need more information.
Thanks
—
Jatin


Spark thrift service and Hive impersonation.

2015-09-29 Thread Jagat Singh
Hi,

I have started the Spark thrift service using spark user.

Does each user needs to start own thrift server to use it?

Using beeline i am able to connect to server and execute show tables;

However when we try to execute some real query it runs as spark user and
HDFS permissions does not allow them to be read.

The query fails with error

0: jdbc:hive2://localhost:1> select count(*) from mytable;
Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch
table mytable. java.security.AccessControlException: Permission denied:
user=spark, access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x
at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)

And in thrift server we get log.


In the hive-site.xml we have impersonation enabled.

   
  hive.server2.enable.doAs
  true



  hive.server2.enable.impersonation
  true


Is there any other configuration to be done for it to work like normal hive
thrift server.

Thanks


Re: Hive ORC Malformed while loading into spark data frame

2015-09-29 Thread Hortonworks
How was the table is generated, by hive or by spark? 

If you generate table using have but read it by data frame, it may have some 
comparability issue.

Thanks

Zhan Zhang


Sent from my iPhone

> On Sep 29, 2015, at 1:47 PM, unk1102  wrote:
> 
> Hi I have a spark job which creates hive tables in orc format with
> partitions. It works well I can read data back into hive table using hive
> console. But if I try further process orc files generated by Spark job by
> loading into dataframe  then I get the following exception 
> Caused by: java.io.IOException: Malformed ORC file
> hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt. Invalid
> postscript.
> 
> Dataframe df = hiveContext.read().format("orc").load(to/path);  
> 
> Please guide. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JobScheduler: Error generating jobs for time for custom InputDStream

2015-09-29 Thread Juan Rodríguez Hortalá
Hi Shixiong,

Thanks for your answer. I will take a lot to your suggestion, maybe my call
to SparkContext.parallelize doesn't work well when there are less records
to parallelize than partitions.

Thanks a lot for your help

Greetings,

Juan

2015-09-24 2:04 GMT-07:00 Shixiong Zhu :

> Looks like you returns a "Some(null)" in "compute". If you don't want to
> create a RDD, it should return None. If you want to return an empty RDD, it
> should return "Some(sc.emptyRDD)".
>
> Best Regards,
> Shixiong Zhu
>
> 2015-09-15 2:51 GMT+08:00 Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com>:
>
>> Hi,
>>
>> I sent this message to the user list a few weeks ago with no luck, so I'm
>> forwarding it to the dev list in case someone could give a hand with this.
>> Thanks a lot in advance
>>
>>
>> I've developed a ScalaCheck property for testing Spark Streaming
>> transformations. To do that I had to develop a custom InputDStream, which
>> is very similar to QueueInputDStream but has a method for adding new test
>> cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
>> You can see the code at
>> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
>> I have developed a few properties that run in local mode
>> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
>> The problem is that when the batch interval is too small, and the machine
>> cannot complete the batches fast enough, I get the following exceptions in
>> the Spark log
>>
>> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
>> 1440580922500 ms
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
>> at
>> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>> at
>> 

Re: pyspark-Failed to run first

2015-09-29 Thread balajikvijayan
Any updates on this issue? A cursory search shows that others are still
experiencing this issue. I'm seeing this occur on trivial data sets in
pyspark; however they are running successfully in scala.

While this is an acceptable workaround I would like to know if this item is
on the spark roadmap or if I should completely punt on pyspark and use only
scala. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Failed-to-run-first-tp7691p24879.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Yahoo's Caffe-on-Spark project

2015-09-29 Thread Thomas Dudziak
http://yahoohadoop.tumblr.com/post/129872361846/large-scale-distributed-deep-learning-on-hadoop

I would be curious to learn what the Spark developer's plans are in this
area (NNs, GPUs) and what they think of integration with existing NN
frameworks like Caffe or Torch.

cheers,
Tom


Re: Cant perform full outer join

2015-09-29 Thread Terry Hoo
Saif,

Might be you can rename one of the dataframe to different name first, then
do an outer join and a select like this:

val cur_d = cur_data.toDF("Date_1", "Value_1")
val r = data.join(cur_d, data("DATE" === cur_d("Date_1",
"outer").select($"DATE", $"VALUE", $"Value_1")

Thanks,
Terry

On Tue, Sep 29, 2015 at 9:56 PM,  wrote:

> Hi all,
>
> So I Have two dataframes, with two columns: DATE and VALUE.
>
> Performing this:
> data = data.join(cur_data, data(“DATE”) === cur_data("DATE"), "outer")
>
> returns
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> Reference 'DATE' is ambiguous, could be: DATE#0, DATE#3.;
>
> But if I change one of the column names, I will get two columns and won’t
> really merge “DATE” column as I wish. Any ideas without going to non
> trivial procedures?
>
> Thanks,
> Saif
>
>


Re: unintended consequence of using coalesce operation

2015-09-29 Thread Michael Armbrust
coalesce is generally to avoid launching too many tasks, on a bunch of
small files.  As a result, the goal is to reduce parallelism (when the
overhead of that parallelism is more costly than the gain).  You are
correct that in you case repartition sounds like a better choice.

On Tue, Sep 29, 2015 at 4:33 PM, Lan Jiang  wrote:

> Hi, there
>
> I ran into an issue when using Spark (v 1.3) to load avro file through
> Spark SQL. The code sample is below
>
> val df = sqlContext.load(“path-to-avro-file","com.databricks.spark.avro”)
> val myrdd = df.select(“Key", “Name", “binaryfield").rdd
> val results = myrdd.map(...)
> val finalResults = results.filter(...)
> finalResults.*coalesce(1)*.toDF().saveAsParquetFile(“path-to-parquet”)
>
> The avro file 645M. The HDFS block size is 128M. Thus the total is 5 HDFS
> blocks, which means there should be 5 partitions. Please note that I use
> coalesce because I expect the previous filter transformation should filter
> out almost all the data and I would like to write to 1 single parquet file.
>
> YARN cluster has 3 datanodes. I use the below configuration for spark
> submit
>
> spark-submit —class  —num-executors 3 —executor-cores 2
> —executor-memory 8g —master yarn-client mytest.jar
>
> I do see 3 executors being created, one on each data/worker node. However,
> there is only one task running within the cluster.  After I remove the
> coalesce(1) call from the codes, I can see 5 tasks generates, spreading
> across 3 executors.  I was surprised by the result. coalesce usually is
> thought to be a better choice than repartition operation when reducing the
> partition numbers. However, in the case, it causes performance issue
> because Spark only creates one task because the final partition number was
> coalesced to 1.  Thus there is only one thread reading HDFS files instead
> of 5.
>
> Is my understanding correct? In this case, I think repartition is a better
> choice than coalesce.
>
> Lan
>
>
>
>


RE: Spark thrift service and Hive impersonation.

2015-09-29 Thread Mohammed Guller
When a user issues a connect command from Beeline, it asks for username and 
password. What happens if you give spark as the user name?

Also, it looks like permission for "/data/mytable” is drwxr-x—x

Have you tried changing the permission to allow other users to read?

Mohammed

From: Jagat Singh [mailto:jagatsi...@gmail.com]
Sent: Tuesday, September 29, 2015 6:32 PM
To: Mohammed Guller
Cc: SparkUser
Subject: Re: Spark thrift service and Hive impersonation.

Hi,

Thanks for your reply.

If you see the log message

Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
mytable. java.security.AccessControlException: Permission denied: user=spark, 
access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x

Spark is trying to read as spark user , using which we started thrift server.

Since spark user does not have actual read access we get the error.

However the beeline is used by end user not spark user and throws error.

Thanks,

Jagat Singh



On Wed, Sep 30, 2015 at 11:24 AM, Mohammed Guller 
> wrote:
Does each user needs to start own thrift server to use it?

No. One of the benefits of the Spark Thrift Server is that it allows multiple 
users to share a single SparkContext.

Most likely, you have file permissions issue.

Mohammed

From: Jagat Singh [mailto:jagatsi...@gmail.com]
Sent: Tuesday, September 29, 2015 5:30 PM
To: SparkUser
Subject: Spark thrift service and Hive impersonation.

Hi,

I have started the Spark thrift service using spark user.

Does each user needs to start own thrift server to use it?

Using beeline i am able to connect to server and execute show tables;

However when we try to execute some real query it runs as spark user and HDFS 
permissions does not allow them to be read.

The query fails with error

0: jdbc:hive2://localhost:1> select count(*) from mytable;
Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
mytable. java.security.AccessControlException: Permission denied: user=spark, 
access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)


And in thrift server we get log.


In the hive-site.xml we have impersonation enabled.

   
  hive.server2.enable.doAs
  true



  hive.server2.enable.impersonation
  true


Is there any other configuration to be done for it to work like normal hive 
thrift server.

Thanks



Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Dmitry Goldenberg
I'm actually not sure how either one of these would possibly cause Spark to
find SolrException. Whether the driver or executor class path is first,
should it not matter, if the class is in the consumer job jar?




On Tue, Sep 29, 2015 at 9:12 PM, Dmitry Goldenberg  wrote:

> Ted, I think I have tried these settings with the hbase protocol jar, to
> no avail.
>
> I'm going to see if I can try and use these with this SolrException issue
> though it now may be harder to reproduce it. Thanks for the suggestion.
>
> On Tue, Sep 29, 2015 at 8:03 PM, Ted Yu  wrote:
>
>> Have you tried the following ?
>> --conf spark.driver.userClassPathFirst=true --conf spark.executor.
>> userClassPathFirst=true
>>
>> On Tue, Sep 29, 2015 at 4:38 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Release of Spark: 1.5.0.
>>>
>>> Command line invokation:
>>>
>>> ACME_INGEST_HOME=/mnt/acme/acme-ingest
>>> ACME_INGEST_VERSION=0.0.1-SNAPSHOT
>>> ACME_BATCH_DURATION_MILLIS=5000
>>> SPARK_MASTER_URL=spark://data1:7077
>>> JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
>>> JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"
>>>
>>> $SPARK_HOME/bin/spark-submit \
>>> --driver-class-path  $ACME_INGEST_HOME \
>>> --driver-java-options "$JAVA_OPTIONS" \
>>> --class
>>> "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver" \
>>> --master $SPARK_MASTER_URL  \
>>> --conf
>>> "spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
>>> \
>>>
>>> $ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
>>> -brokerlist $METADATA_BROKER_LIST \
>>> -topic acme.topic1 \
>>> -autooffsetreset largest \
>>> -batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
>>> -appname Acme.App1 \
>>> -checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
>>> Note that SolrException is definitely in our consumer jar
>>> acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
>>> $ACME_INGEST_HOME.
>>>
>>> For the extraClassPath on the executors, we've got additionally
>>> hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
>>> Spark jobs to communicate with HBase.  The only way to force Phoenix to
>>> successfully communicate with HBase was to have that JAR explicitly added
>>> to the executor classpath regardless of the fact that the contents of the
>>> hbase-protocol hadoop jar get rolled up into the consumer jar at build time.
>>>
>>> I'm starting to wonder whether there's some class loading pattern here
>>> where some classes may not get loaded out of the consumer jar and therefore
>>> have to have their respective jars added to the executor extraClassPath?
>>>
>>> Or is this a serialization problem for SolrException as Divya
>>> Ravichandran suggested?
>>>
>>>
>>>
>>>
>>> On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:
>>>
 Mind providing a bit more information:

 release of Spark
 command line for running Spark job

 Cheers

 On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
 dgoldenberg...@gmail.com> wrote:

> We're seeing this occasionally. Granted, this was caused by a wrinkle
> in the Solr schema but this bubbled up all the way in Spark and caused job
> failures.
>
> I just checked and SolrException class is actually in the consumer job
> jar we use.  Is there any reason why Spark cannot find the SolrException
> class?
>
> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
> could not be deserialized
> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> 

Re: Hive ORC Malformed while loading into spark data frame

2015-09-29 Thread Umesh Kacha
Hi I can read/load orc data created by hive table in a dataframe why is it
throwing Malformed ORC exception when I try to load data created by
hiveContext.sql into dataframe?
On Sep 30, 2015 2:37 AM, "Hortonworks"  wrote:

> You can try to use data frame for both read and write
>
> Thanks
>
> Zhan Zhang
>
>
> Sent from my iPhone
>
> On Sep 29, 2015, at 1:56 PM, Umesh Kacha  wrote:
>
> Hi Zang, thanks for the response. Table is created using Spark
> hiveContext.sql and data inserted into table also using hiveContext.sql.
> Insert into partition table. When I try to load orc data into dataframe I
> am loading particular partition data stored in path say
> /user/xyz/Hive/xyz.db/sparktable/partition1=abc
>
> Regards,
> Umesh
> On Sep 30, 2015 02:21, "Hortonworks"  wrote:
>
>> How was the table is generated, by hive or by spark?
>>
>> If you generate table using have but read it by data frame, it may have
>> some comparability issue.
>>
>> Thanks
>>
>> Zhan Zhang
>>
>>
>> Sent from my iPhone
>>
>> > On Sep 29, 2015, at 1:47 PM, unk1102  wrote:
>> >
>> > Hi I have a spark job which creates hive tables in orc format with
>> > partitions. It works well I can read data back into hive table using
>> hive
>> > console. But if I try further process orc files generated by Spark job
>> by
>> > loading into dataframe  then I get the following exception
>> > Caused by: java.io.IOException: Malformed ORC file
>> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt. Invalid
>> > postscript.
>> >
>> > Dataframe df = hiveContext.read().format("orc").load(to/path);
>> >
>> > Please guide.
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>> >
>>
>> --
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity
>> to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader
>> of this message is not the intended recipient, you are hereby notified
>> that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender
>> immediately
>> and delete it from your system. Thank You.
>>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.


Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Dmitry Goldenberg
Ted, I think I have tried these settings with the hbase protocol jar, to no
avail.

I'm going to see if I can try and use these with this SolrException issue
though it now may be harder to reproduce it. Thanks for the suggestion.

On Tue, Sep 29, 2015 at 8:03 PM, Ted Yu  wrote:

> Have you tried the following ?
> --conf spark.driver.userClassPathFirst=true --conf spark.executor.
> userClassPathFirst=true
>
> On Tue, Sep 29, 2015 at 4:38 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Release of Spark: 1.5.0.
>>
>> Command line invokation:
>>
>> ACME_INGEST_HOME=/mnt/acme/acme-ingest
>> ACME_INGEST_VERSION=0.0.1-SNAPSHOT
>> ACME_BATCH_DURATION_MILLIS=5000
>> SPARK_MASTER_URL=spark://data1:7077
>> JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
>> JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"
>>
>> $SPARK_HOME/bin/spark-submit \
>> --driver-class-path  $ACME_INGEST_HOME \
>> --driver-java-options "$JAVA_OPTIONS" \
>> --class "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver"
>> \
>> --master $SPARK_MASTER_URL  \
>> --conf
>> "spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
>> \
>>
>> $ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
>> -brokerlist $METADATA_BROKER_LIST \
>> -topic acme.topic1 \
>> -autooffsetreset largest \
>> -batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
>> -appname Acme.App1 \
>> -checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
>> Note that SolrException is definitely in our consumer jar
>> acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
>> $ACME_INGEST_HOME.
>>
>> For the extraClassPath on the executors, we've got additionally
>> hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
>> Spark jobs to communicate with HBase.  The only way to force Phoenix to
>> successfully communicate with HBase was to have that JAR explicitly added
>> to the executor classpath regardless of the fact that the contents of the
>> hbase-protocol hadoop jar get rolled up into the consumer jar at build time.
>>
>> I'm starting to wonder whether there's some class loading pattern here
>> where some classes may not get loaded out of the consumer jar and therefore
>> have to have their respective jars added to the executor extraClassPath?
>>
>> Or is this a serialization problem for SolrException as Divya
>> Ravichandran suggested?
>>
>>
>>
>>
>> On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:
>>
>>> Mind providing a bit more information:
>>>
>>> release of Spark
>>> command line for running Spark job
>>>
>>> Cheers
>>>
>>> On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
 We're seeing this occasionally. Granted, this was caused by a wrinkle
 in the Solr schema but this bubbled up all the way in Spark and caused job
 failures.

 I just checked and SolrException class is actually in the consumer job
 jar we use.  Is there any reason why Spark cannot find the SolrException
 class?

 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
 could not be deserialized
 java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
 at
 org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
 at 

RE: laziness in textFile reading from HDFS?

2015-09-29 Thread Mohammed Guller
1) It is not required to have the same amount of memory as data. 
2) By default the # of partitions are equal to the number of HDFS blocks
3) Yes, the read operation is lazy
4) It is okay to have more number of partitions than number of cores. 

Mohammed

-Original Message-
From: davidkl [mailto:davidkl...@hotmail.com] 
Sent: Monday, September 28, 2015 1:40 AM
To: user@spark.apache.org
Subject: laziness in textFile reading from HDFS?

Hello,

I need to process a significant amount of data every day, about 4TB. This will 
be processed in batches of about 140GB. The cluster this will be running on 
doesn't have enough memory to hold the dataset at once, so I am trying to 
understand how this works internally.

When using textFile to read an HDFS folder (containing multiple files), I 
understand that the number of partitions created are equal to the number of 
HDFS blocks, correct? Are those created in a lazy way? I mean, if the number of 
blocks/partitions is larger than the number of cores/threads the Spark driver 
was launched with (N), are N partitions created initially and then the rest 
when required? Or are all those partitions created up front?

I want to avoid reading the whole data into memory just to spill it out to disk 
if there is no enough memory.

Thanks! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-textFile-reading-from-HDFS-tp24837.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext._active_spark_context returns None

2015-09-29 Thread YiZhi Liu
Hi Ted,

I think I've make a mistake. I refered to python/mllib, callJavaFunc
in mllib/common.py use SparkContext._active_spark_context because it
is called from the driver. So maybe there is no explicit way to  reach
JVM during rdd operations?

What I want to achieve is to take a ThriftWritable object from an
LzoBlockInputFormat and deserialize it to a java object. If I could, I
want to further transform the thrift object to DataFrame.

I think I can implement a custom org.apache.spark.api.python.Converter
and pass it to sc.hadoopFile(...,keyConverterClass,valueConverterClass...).
But, once I get the converted java object, can I call its methods in
python directly, i.e. reach the JVM?

Thanks a lot!

2015-09-30 0:54 GMT+08:00 Ted Yu :
> bq. the right way to reach JVM in python
>
> Can you tell us more about what you want to achieve ?
>
> If you want to pass some value to workers, you can use broadcast variable.
>
> Cheers
>
> On Mon, Sep 28, 2015 at 10:31 PM, YiZhi Liu  wrote:
>>
>> Hi Ted,
>>
>> Thank you for reply. The sc works at driver, but how can I reach the
>> JVM in rdd.map ?
>>
>> 2015-09-29 11:26 GMT+08:00 Ted Yu :
>>  sc._jvm.java.lang.Integer.valueOf("12")
>> > 12
>> >
>> > FYI
>> >
>> > On Mon, Sep 28, 2015 at 8:08 PM, YiZhi Liu  wrote:
>> >>
>> >> Hi,
>> >>
>> >> I'm doing some data processing on pyspark, but I failed to reach JVM
>> >> in workers. Here is what I did:
>> >>
>> >> $ bin/pyspark
>> >> >>> data = sc.parallelize(["123", "234"])
>> >> >>> numbers = data.map(lambda s:
>> >> >>>
>> >> >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf(s.strip()))
>> >> >>> numbers.collect()
>> >>
>> >> I got,
>> >>
>> >> Caused by: org.apache.spark.api.python.PythonException: Traceback
>> >> (most recent call last):
>> >>   File
>> >>
>> >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> >> line 111, in main
>> >> process()
>> >>   File
>> >>
>> >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/worker.py",
>> >> line 106, in process
>> >> serializer.dump_stream(func(split_index, iterator), outfile)
>> >>   File
>> >>
>> >> "/mnt/hgfs/lewis/Workspace/source-codes/spark/python/lib/pyspark.zip/pyspark/serializers.py",
>> >> line 263, in dump_stream
>> >> vs = list(itertools.islice(iterator, batch))
>> >>   File "", line 1, in 
>> >> AttributeError: 'NoneType' object has no attribute '_jvm'
>> >>
>> >> at
>> >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
>> >> at
>> >>
>> >> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179)
>> >> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> >> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> >> at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> >> at
>> >>
>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> >> at
>> >>
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> >> ... 1 more
>> >>
>> >> While _jvm at the driver end looks fine:
>> >>
>> >> >>>
>> >> >>>
>> >> >>> SparkContext._active_spark_context._jvm.java.lang.Integer.valueOf("123".strip())
>> >> 123
>> >>
>> >> The program is trivial, I just wonder what is the right way to reach
>> >> JVM in python. Any help would be appreciated.
>> >>
>> >> Thanks
>> >>
>> >> --
>> >> Yizhi Liu
>> >> Senior Software Engineer / Data Mining
>> >> www.mvad.com, Shanghai, China
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>>
>>
>>
>> --
>> Yizhi Liu
>> Senior Software Engineer / Data Mining
>> www.mvad.com, Shanghai, China
>
>



-- 
Yizhi Liu
Senior Software Engineer / Data Mining
www.mvad.com, Shanghai, China

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark thrift service and Hive impersonation.

2015-09-29 Thread Mohammed Guller
Does each user needs to start own thrift server to use it?

No. One of the benefits of the Spark Thrift Server is that it allows multiple 
users to share a single SparkContext.

Most likely, you have file permissions issue.

Mohammed

From: Jagat Singh [mailto:jagatsi...@gmail.com]
Sent: Tuesday, September 29, 2015 5:30 PM
To: SparkUser
Subject: Spark thrift service and Hive impersonation.

Hi,

I have started the Spark thrift service using spark user.

Does each user needs to start own thrift server to use it?

Using beeline i am able to connect to server and execute show tables;

However when we try to execute some real query it runs as spark user and HDFS 
permissions does not allow them to be read.

The query fails with error

0: jdbc:hive2://localhost:1> select count(*) from mytable;
Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table 
mytable. java.security.AccessControlException: Permission denied: user=spark, 
access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)


And in thrift server we get log.


In the hive-site.xml we have impersonation enabled.

   
  hive.server2.enable.doAs
  true



  hive.server2.enable.impersonation
  true


Is there any other configuration to be done for it to work like normal hive 
thrift server.

Thanks


Re: Monitoring tools for spark streaming

2015-09-29 Thread Otis Gospodnetić
Hi,

There's also SPM for Spark --
http://sematext.com/spm/integrations/spark-monitoring.html

SPM graphs all Spark metrics and gives you alerting, anomaly detection,
etc. and if you ship your Spark and/or other logs to Logsene -
http://sematext.com/logsene - you can correlate metrics, logs, errors,
etc.  I haven't used SPM with Spark after "AppMap" was introduced (see
http://blog.sematext.com/2015/08/06/introducing-appmap/ ) but I imagine it
would be nice to see a map of Spark nodes talking to each other.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Sep 28, 2015 at 7:52 PM, Siva  wrote:

> Hi,
>
> Could someone recommend the monitoring tools for spark streaming?
>
> By extending StreamingListener we can dump the delay in processing of
> batches and some alert messages.
>
> But are there any Web UI tools where we can monitor failures, see delays
> in processing, error messages and setup alerts etc.
>
> Thanks
>
>


Re: Spark thrift service and Hive impersonation.

2015-09-29 Thread Jagat Singh
Hi,

Thanks for your reply.

If you see the log message

Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch
table mytable. java.security.AccessControlException: Permission denied:
user=spark, access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x

Spark is trying to read as spark user , using which we started thrift
server.

Since spark user does not have actual read access we get the error.

However the beeline is used by end user not spark user and throws error.

Thanks,

Jagat Singh



On Wed, Sep 30, 2015 at 11:24 AM, Mohammed Guller 
wrote:

> Does each user needs to start own thrift server to use it?
>
>
>
> No. One of the benefits of the Spark Thrift Server is that it allows
> multiple users to share a single SparkContext.
>
>
>
> Most likely, you have file permissions issue.
>
>
>
> Mohammed
>
>
>
> *From:* Jagat Singh [mailto:jagatsi...@gmail.com]
> *Sent:* Tuesday, September 29, 2015 5:30 PM
> *To:* SparkUser
> *Subject:* Spark thrift service and Hive impersonation.
>
>
>
> Hi,
>
>
>
> I have started the Spark thrift service using spark user.
>
>
>
> Does each user needs to start own thrift server to use it?
>
>
>
> Using beeline i am able to connect to server and execute show tables;
>
>
>
> However when we try to execute some real query it runs as spark user and
> HDFS permissions does not allow them to be read.
>
>
>
> The query fails with error
>
>
>
> 0: jdbc:hive2://localhost:1> select count(*) from mytable;
>
> Error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch
> table mytable. java.security.AccessControlException: Permission denied:
> user=spark, access=READ, inode="/data/mytable":tdcprdr:tdcprdr:drwxr-x--x
>
> at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)
>
>
>
>
>
> And in thrift server we get log.
>
>
>
>
>
> In the hive-site.xml we have impersonation enabled.
>
>
>
>
>
>   hive.server2.enable.doAs
>
>   true
>
> 
>
>
>
> 
>
>   hive.server2.enable.impersonation
>
>   true
>
> 
>
>
>
> Is there any other configuration to be done for it to work like normal
> hive thrift server.
>
>
>
> Thanks
>


Re: Spark SQL: Implementing Custom Data Source

2015-09-29 Thread Michael Armbrust
Yep, we've designed it so that we take care of any translation that needs
to be done for you.

On Tue, Sep 29, 2015 at 10:39 AM, Jerry Lam  wrote:

> Hi Michael and Ted,
>
> Thank you for the reference. Is it true that once I implement a custom
> data source, it can be used in all spark supported language? That is Scala,
> Java, Python and R. :)
> I want to take advantage of the interoperability that is already built in
> spark.
>
> Thanks!
>
> Jerry
>
> On Tue, Sep 29, 2015 at 11:31 AM, Michael Armbrust  > wrote:
>
>> Thats a pretty advanced example that uses experimental APIs.  I'd suggest
>> looking at https://github.com/databricks/spark-avro as a reference.
>>
>> On Mon, Sep 28, 2015 at 9:00 PM, Ted Yu  wrote:
>>
>>> See this thread:
>>>
>>> http://search-hadoop.com/m/q3RTttmiYDqGc202
>>>
>>> And:
>>>
>>>
>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
>>>
>>> On Sep 28, 2015, at 8:22 PM, Jerry Lam  wrote:
>>>
>>> Hi spark users and developers,
>>>
>>> I'm trying to learn how implement a custom data source for Spark SQL. Is
>>> there a documentation that I can use as a reference? I'm not sure exactly
>>> what needs to be extended/implemented. A general workflow will be greatly
>>> helpful!
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>
>


Re: Setting executors per worker - Standalone

2015-09-29 Thread James Pirz
Thanks for your help.
You were correct about the memory settings. Previously I had following
config:

--executor-memory 8g --conf spark.executor.cores=1

Which was really conflicting, as in spark-env.sh I had:

export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=8192m

So the memory budget per worker was not enough to launch several executors.
By switching to:

--executor-memory 2g --conf spark.executor.cores=1

Now I can see that on each machine I have one worker, with 4 executors.

Thanks again for your help.


On Tue, Sep 29, 2015 at 1:30 AM, Robin East  wrote:

> I’m currently testing this exact setup - it work for me using both —conf
> spark.exeuctors.cores=1 and —executor-cores 1. Do you have some memory
> settings that need to be adjusted as well? Or do you accidentally have
> —total-executor-cores set as well? You should be able to tell from looking
> at the environment tab on the Application UI
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 29 Sep 2015, at 04:47, James Pirz  wrote:
>
> Thanks for your reply.
>
> Setting it as
>
> --conf spark.executor.cores=1
>
> when I start spark-shell (as an example application) indeed sets the
> number of cores per executor as 1 (which is 4 before), but I still have 1
> executor per worker. What I am really looking for is having 1 worker with 4
> executor (each with one core) per machine when I run my application. Based
> one the documentation it seems it is feasible, but it is not clear as how.
>
> Thanks.
>
> On Mon, Sep 28, 2015 at 8:46 PM, Jeff Zhang  wrote:
>
>> use "--executor-cores 1" you will get 4 executors per worker since you
>> have 4 cores per worker
>>
>>
>>
>> On Tue, Sep 29, 2015 at 8:24 AM, James Pirz  wrote:
>>
>>> Hi,
>>>
>>> I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while
>>> each machine has 12GB of RAM and 4 cores. On each machine I have one worker
>>> which is running one executor that grabs all 4 cores. I am interested to
>>> check the performance with "one worker but 4 executors per machine - each
>>> with one core".
>>>
>>> I can see that "running multiple executors per worker in Standalone
>>> mode" is possible based on the closed issue:
>>>
>>> https://issues.apache.org/jira/browse/SPARK-1706
>>>
>>> But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only
>>> available for the Yarn mode, and in the standalone mode I can just set
>>> "SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".
>>>
>>> Any hint or suggestion would be great.
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>


Re: Spark SQL: Implementing Custom Data Source

2015-09-29 Thread Jerry Lam
Hi Michael and Ted,

Thank you for the reference. Is it true that once I implement a custom data
source, it can be used in all spark supported language? That is Scala,
Java, Python and R. :)
I want to take advantage of the interoperability that is already built in
spark.

Thanks!

Jerry

On Tue, Sep 29, 2015 at 11:31 AM, Michael Armbrust 
wrote:

> Thats a pretty advanced example that uses experimental APIs.  I'd suggest
> looking at https://github.com/databricks/spark-avro as a reference.
>
> On Mon, Sep 28, 2015 at 9:00 PM, Ted Yu  wrote:
>
>> See this thread:
>>
>> http://search-hadoop.com/m/q3RTttmiYDqGc202
>>
>> And:
>>
>>
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
>>
>> On Sep 28, 2015, at 8:22 PM, Jerry Lam  wrote:
>>
>> Hi spark users and developers,
>>
>> I'm trying to learn how implement a custom data source for Spark SQL. Is
>> there a documentation that I can use as a reference? I'm not sure exactly
>> what needs to be extended/implemented. A general workflow will be greatly
>> helpful!
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>


Spark SQL deprecating Hive? How will I access Hive metadata in the future?

2015-09-29 Thread YaoPau
I've heard that Spark SQL will be or has already started deprecating HQL.  We
have Spark SQL + Python jobs that currently read from the Hive metastore to
get things like table location and partition values.  

Will we have to re-code these functions in future releases of Spark (maybe
by connecting to Hive directly), or will fetching Hive metastore data be
supported in future releases via regular SQL?

Jon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-deprecating-Hive-How-will-I-access-Hive-metadata-in-the-future-tp24874.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to set System environment variables in Spark

2015-09-29 Thread swetha

Hi,

How to set System environment variables when submitting a job?  Suppose I
have the environment variable as shown below. I have been trying to specify
--- -Dcom.w1.p1.config.runOnEnv=dev and --conf
-Dcom.w1.p1.config.runOnEnv=dev. But, it does not seem to be working. How to
set environment variable when submitting a job in Spark?


-Dcom.w1.p1.config.runOnEnv=dev

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-System-environment-variables-in-Spark-tp24875.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to find how much data will be train in mllib or how much the spark job is completed ?

2015-09-29 Thread Robineast
so you could query the rest api in code. E.g. /applications//stages
provides details on the number of active and completed tasks in each stage



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-how-much-data-will-be-train-in-mllib-or-how-much-the-spark-job-is-completed-tp24858p24871.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Best practices to call small spark jobs as part of REST api

2015-09-29 Thread unk1102
Hi I would like to know any best practices to call spark jobs in rest api. My
Spark jobs returns results as json and that json can be used by UI
application.

Should we even have direct HDFS/Spark backend layer in UI for on demand
queries? Please guide. Thanks much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-to-call-small-spark-jobs-as-part-of-REST-api-tp24872.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set System environment variables in Spark

2015-09-29 Thread Nithin Asokan
--conf is used to pass any spark configuration that starts with *spark.**

You can also use "--driver-java-options" to pass any system properties you
would like to the driver program.

On Tue, Sep 29, 2015 at 2:30 PM swetha  wrote:

>
> Hi,
>
> How to set System environment variables when submitting a job?  Suppose I
> have the environment variable as shown below. I have been trying to specify
> --- -Dcom.w1.p1.config.runOnEnv=dev and --conf
> -Dcom.w1.p1.config.runOnEnv=dev. But, it does not seem to be working. How
> to
> set environment variable when submitting a job in Spark?
>
>
> -Dcom.w1.p1.config.runOnEnv=dev
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-System-environment-variables-in-Spark-tp24875.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


input file from tar.gz

2015-09-29 Thread Peter Rudenko
Hi, i have a huge tar.gz file on dfs. This file contains several files, 
but i want to use only one of them as input. Is it possible to filter 
somehow a tar.gz schema, something like this:

sc.textFile("hdfs:///data/huge.tar.gz#input.txt")

Thanks,
Peter Rudenko


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark mailing list confusion

2015-09-29 Thread Robineast
Does anyone have any idea why some topics on the mailing list end up on
https://www.mail-archive.com/user@spark.apache.org e.g.  this message thread
  , but
not on http://apache-spark-user-list.1001560.n3.nabble.com ? 

Whilst I get notified of all messages when I reply via email they never
appear in either of the archives (I can use the web interface for nabble but
not for mail-archive.



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-mailing-list-confusion-tp24870.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: input file from tar.gz

2015-09-29 Thread Ted Yu
The syntax using '#' is not supported by hdfs natively.

YARN resource localization supports such notion. See
http://hadoop.apache.org/docs/r2.7.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html

Not sure about Spark.

On Tue, Sep 29, 2015 at 11:39 AM, Peter Rudenko 
wrote:

> Hi, i have a huge tar.gz file on dfs. This file contains several files,
> but i want to use only one of them as input. Is it possible to filter
> somehow a tar.gz schema, something like this:
> sc.textFile("hdfs:///data/huge.tar.gz#input.txt")
>
> Thanks,
> Peter Rudenko
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to set System environment variables in Spark

2015-09-29 Thread Ted Yu
Please see 'spark.executorEnv.[EnvironmentVariableName]' in
https://spark.apache.org/docs/latest/configuration.html#runtime-environment

FYI

On Tue, Sep 29, 2015 at 12:29 PM, swetha  wrote:

>
> Hi,
>
> How to set System environment variables when submitting a job?  Suppose I
> have the environment variable as shown below. I have been trying to specify
> --- -Dcom.w1.p1.config.runOnEnv=dev and --conf
> -Dcom.w1.p1.config.runOnEnv=dev. But, it does not seem to be working. How
> to
> set environment variable when submitting a job in Spark?
>
>
> -Dcom.w1.p1.config.runOnEnv=dev
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-System-environment-variables-in-Spark-tp24875.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: "Method json([class java.util.HashMap]) does not exist" when reading JSON

2015-09-29 Thread Fernando Paladini
Thank you for the awesome explained answers! :

Actually I've a data_point (simplifying, a sensor inside a physical room)
and each data_point has its own point_values (the signals generated by the
sensor, including the timestamp of when this signal was generated).

That's what I get when I run "dataframe.show()" [tags and group_by is
unnecessary data generate by KairosDB):

+---+-++-+-+
|   group_by| name|tags
|  values  |
+---+-++-+-+
|[[type,number]]  |  DP_107029  | [WrappedArray(DP_...  |
[WrappedArray(1.4...  |
|[[type,number]]  |  DP_756561  | [WrappedArray(DP_...  |
[WrappedArray(1.4...  |
+---+-++-+-+

Following is a gist that shows you the structure of my JSON:
https://gist.github.com/paladini/1b8de8f10401a77965b5

Did you see something wrong?
Again, thank you very much for the help!

2015-09-29 17:14 GMT-03:00 Fernando Paladini :

> Of course, I didn't saw that Gmail was only sending it for you. Sorry :/
>
> 2015-09-29 17:13 GMT-03:00 Ted Yu :
>
>> For further analysis, can you post your most recent question on mailing
>> list ?
>>
>> Cheers
>>
>> On Tue, Sep 29, 2015 at 1:11 PM, Fernando Paladini 
>> wrote:
>>
>>> Thank you for the awesome explained answers! :
>>>
>>> Actually I've a data_point (simplifying, a sensor inside a physical
>>> room) and each data_point has its own point_values (the signals generated
>>> by the sensor, including the timestamp of when this signal was generated).
>>>
>>> That's what I get when I run "dataframe.show()" [tags and group_by is
>>> unnecessary data generate by KairosDB):
>>>
>>>
>>> +---+-++-+-+
>>> |   group_by| name|
>>> tags|  values  |
>>>
>>> +---+-++-+-+
>>> |[[type,number]]  |  DP_107029  | [WrappedArray(DP_...  |
>>> [WrappedArray(1.4...  |
>>> |[[type,number]]  |  DP_756561  | [WrappedArray(DP_...  |
>>> [WrappedArray(1.4...  |
>>>
>>> +---+-++-+-+
>>>
>>> Following is a gist that shows you the structure of my JSON:
>>> https://gist.github.com/paladini/1b8de8f10401a77965b5
>>>
>>> Did you see something wrong?
>>> Again, thank you very much for the help!
>>>
>>>
>>>
>>>
>>> 2015-09-29 15:20 GMT-03:00 Ted Yu :
>>>
 Spark should be able to read JSON files and generate data
 frames correctly - as long as JSON files are correctly formatted (one
 record on each line).

 Cheers

 On Tue, Sep 29, 2015 at 7:27 AM, Ted Yu  wrote:

> sqlContext.read.json() expects Path to the JSON file.
>
> FYI
>
> On Tue, Sep 29, 2015 at 7:23 AM, Fernando Paladini <
> fnpalad...@gmail.com> wrote:
>
>> Hello guys,
>>
>> I'm very new to Spark and I'm having some troubles when reading a
>> JSON to dataframe on PySpark.
>>
>> I'm getting a JSON object from an API response and I would like to
>> store it in Spark as a DataFrame (I've read that DataFrame is better than
>> RDD, that's accurate?). For what I've read
>> 
>> on documentation, I just need to call the method sqlContext.read.json in
>> order to do what I want.
>>
>> *Following is the code from my test application:*
>> json_object = json.loads(response.text)
>> sc = SparkContext("local", appName="JSON to RDD")
>> sqlContext = SQLContext(sc)
>> dataframe = sqlContext.read.json(json_object)
>> dataframe.show()
>>
>> *The problem is that when I run **"spark-submit myExample.py" I got
>> the following error:*
>> 15/09/29 01:18:54 INFO BlockManagerMasterEndpoint: Registering block
>> manager localhost:48634 with 530.0 MB RAM, BlockManagerId(driver,
>> localhost, 48634)
>> 15/09/29 01:18:54 INFO BlockManagerMaster: Registered BlockManager
>> Traceback (most recent call last):
>>   File
>> "/home/paladini/ufxc/lisha/learning/spark-api-kairos/test1.py", line 35, 
>> in
>> 
>> dataframe = sqlContext.read.json(json_object)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
>> line 144, in json
>>   File
>> 

spark distributed linear system with sparse data

2015-09-29 Thread Cameron McBride
Hi All,

Relatively new to spark and scala, here.

I'm trying to solve a simple linear system of A*x = b where A is a sparse
matrix, and x and b are dense vectors. Standard fare, really. But the
solutions I've found from examples and tests don't seem efficient; I think
the use case I need is falling between the cracks (or I'm missing
something).

We have a barely parallel problem of medium sized data: N = 10k-100k
elements
and the A matrix is a sparse NxN matrix (density of 0.001-0.003).
Currently, these can be solved on single processor machines. Ideally, I'm
looking for a reasonably fast method that can handle larger data (N will
grow with time), and scales with more cores / machines.

I've looked at many examples on the web and archives, and apologies if I
missed something. I've already tried the LinearRegressionWithSGD as well as
an SVD approach, and both were slow and required arbitrary tuning
(stepsize, k principle vectors, etc). (This wasn't unexpected, but gave me
a baseline. Both approaches do more work than is necessary in this case.)

I hand rolled a simple conjugate gradient as well:
https://gist.github.com/cmcbride/7b8551400da179f24345
(all comments / headkicks / improvements are welcome)

This is one of several approaches I tried, but utilizes an IndexedRowMatrix
that uses sparse vectors for the rows, and a hand rolled Matrix-vector
multiplication. I'm being a little pedantic using tuples to ensure proper
ordering of the output.

In any case, all of these are waaay slower (10x-100x slower or more) than a
single threaded scala solution (conjugate gradient approach) currently in
use. I'd obviously like to improve that.

Thanks!

Cameron


Re: Dynamic DAG use-case for spark streaming.

2015-09-29 Thread Tathagata Das
A very basic support that is there in DStream is DStream.transform() which
take arbitrary RDD => RDD function. This function can actually choose to do
different computation with time. That may be of help to you.

On Tue, Sep 29, 2015 at 12:06 PM, Archit Thakur 
wrote:

> Hi,
>
>  We are using spark streaming as our processing engine, and as part of
> output we want to push the data to UI. Now there would be multiple users
> accessing the system with there different filters on. Based on the filters
> and other inputs we want to either run a SQL Query on DStream or do a
> custom logic processing. This would need the system to read the
> filters/query and generate the execution graph at runtime. I cant see any
> support in spark streaming for generating the execution graph on the fly.
> I think I can broadcast the query to executors and read the broadcasted
> query at runtime but that would also limit my user to 1 at a time.
>
> Do we not expect the spark streaming to take queries/filters from outside
> world. Does output in spark streaming only means outputting to an external
> source which could then be queried.
>
> Thanks,
> Archit Thakur.
>


Re: Spark SQL deprecating Hive? How will I access Hive metadata in the future?

2015-09-29 Thread Michael Armbrust
We are not deprecating HiveQL, nor the ability to read metadata from the
metastore.

On Tue, Sep 29, 2015 at 12:24 PM, YaoPau  wrote:

> I've heard that Spark SQL will be or has already started deprecating HQL.
> We
> have Spark SQL + Python jobs that currently read from the Hive metastore to
> get things like table location and partition values.
>
> Will we have to re-code these functions in future releases of Spark (maybe
> by connecting to Hive directly), or will fetching Hive metastore data be
> supported in future releases via regular SQL?
>
> Jon
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-deprecating-Hive-How-will-I-access-Hive-metadata-in-the-future-tp24874.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


  1   2   >